[S-RBAC] Change policies for port's binding:profile field

According to the neutron API-REF [1] port's "binding:profile" field is
intended to be used for the "machine-machine communication for compute
services like Nova, Ironic or Zun to pass information to a Neutron
back-end." so it should be by allowed only for the users with the
SERVICE role granted, not even for ADMIN.
This patch updates that policies to be available only for SERVICE role
when new, secure RBAC policies are enabled.

Additionally this patch updates some policies for create, update and get
port APIs to make them all work in the same way and allow them for the
SERVICE users too.

Finally this new policy for create/update_port:binding:profile have to
be overwritten in the fullstack tests to be allowed also for admin user.
It is done by adding custom policy file for the fullstack tests only.

[1] https://docs.openstack.org/api-ref/network/v2/index.html#create-port

Closes-Bug: #2052937
Change-Id: I5c0094ff21439fe8977cfc623789a09067e6a895
This commit is contained in:
Slawek Kaplonski 2024-02-15 09:50:27 +01:00
parent a8fe0cb369
commit a644b3c62b
11 changed files with 112 additions and 96 deletions

View File

@ -0,0 +1,4 @@
---
# Those are API policy rule overwritten to use in the fullstack tests only
"create_port:binding:profile": "rule:admin_only or rule:service_api"
"update_port:binding:profile": "rule:admin_only or rule:service_api"

View File

@ -66,7 +66,9 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.SERVICE),
scope_types=['project'],
description='Create a port',
operations=ACTION_POST,
@ -184,7 +186,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:binding:host_id',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description=(
'Specify ``binding:host_id`` '
@ -199,7 +201,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:binding:profile',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description=(
'Specify ``binding:profile`` attribute '
@ -214,7 +216,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:binding:vnic_type',
check_str=base.ADMIN_OR_PROJECT_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_MEMBER,
base.SERVICE),
scope_types=['project'],
description=(
'Specify ``binding:vnic_type`` '
@ -302,7 +306,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:vif_type',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:vif_type`` attribute of a port',
operations=ACTION_GET,
@ -314,7 +318,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:vif_details',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:vif_details`` attribute of a port',
operations=ACTION_GET,
@ -326,7 +330,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:host_id',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:host_id`` attribute of a port',
operations=ACTION_GET,
@ -338,7 +342,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_port:binding:profile',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get ``binding:profile`` attribute of a port',
operations=ACTION_GET,
@ -511,7 +515,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:binding:host_id',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Update ``binding:host_id`` attribute of a port',
operations=ACTION_PUT,
@ -523,7 +527,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:binding:profile',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description='Update ``binding:profile`` attribute of a port',
operations=ACTION_PUT,

View File

@ -164,7 +164,7 @@ class NeutronConfigFixture(ConfigFixture):
return c_helpers.find_sample_file('api-paste.ini')
def _generate_policy_yaml(self):
return c_helpers.find_sample_file('policy.yaml')
return c_helpers.find_sample_file('fullstack_tests_policy.yaml')
def get_host(self):
return self.config['DEFAULT']['host']

View File

@ -109,13 +109,13 @@ class TestPortBinding(base.TestOVNFunctionalBase):
'tenant_id': self._tenant_id})
port_req = self.new_create_request('ports', port_data, self.fmt,
as_admin=True)
as_service=True)
port_res = port_req.get_response(self.api)
p = self.deserialize(self.fmt, port_res)
port_id = p['port']['id']
else:
port_req = self.new_update_request('ports', port_data, port_id,
self.fmt, as_admin=True)
self.fmt, as_service=True)
port_res = port_req.get_response(self.api)
self.deserialize(self.fmt, port_res)
@ -740,7 +740,7 @@ class TestExternalPorts(base.TestOVNFunctionalBase):
ovn_const.PORT_CAP_SWITCHDEV]}}}
port_req = self.new_create_request('ports', port_data, self.fmt,
as_admin=True)
as_service=True)
port_res = port_req.get_response(self.api)
port = self.deserialize(self.fmt, port_res)['port']
@ -789,7 +789,7 @@ class TestExternalPorts(base.TestOVNFunctionalBase):
ovn_const.PORT_CAP_SWITCHDEV]}}}
port_req = self.new_update_request(
'ports', port_upt_data, port['id'], self.fmt,
as_admin=True)
as_service=True)
port_res = port_req.get_response(self.api)
port = self.deserialize(self.fmt, port_res)['port']

View File

@ -87,7 +87,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _test_create_port_binding_profile(self, profile):
profile_arg = {portbindings.PROFILE: profile}
with self.port(is_admin=True,
with self.port(is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
port_id = port['port']['id']
@ -107,7 +107,7 @@ class PortBindingsTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self._check_port_binding_profile(port['port'])
port_id = port['port']['id']
port = self._update('ports', port_id, {'port': profile_arg},
as_admin=True)['port']
as_service=True)['port']
self._check_port_binding_profile(port, profile)
port = self._show('ports', port_id, as_admin=True)['port']
self._check_port_binding_profile(port, profile)

View File

@ -496,12 +496,14 @@ class AdminTests(PortAPITestCase):
'create_port:binding:host_id', self.alt_target))
def test_create_port_with_binding_profile(self):
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:profile', self.target))
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:profile', self.alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.alt_target)
def test_create_port_with_binding_vnic_type(self):
self.assertTrue(
@ -679,12 +681,14 @@ class AdminTests(PortAPITestCase):
'update_port:binding:host_id', self.alt_target))
def test_update_port_with_binding_profile(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:profile', self.target))
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:profile', self.alt_target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.alt_target)
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(
@ -1215,9 +1219,8 @@ class ServiceRoleTests(PortAPITestCase):
self.context = self.service_ctx
def test_create_port(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.target)
self.assertTrue(
policy.enforce(self.context, 'create_port', self.target))
def test_create_port_with_device_owner(self):
self.assertTrue(
@ -1251,22 +1254,19 @@ class ServiceRoleTests(PortAPITestCase):
self.target))
def test_create_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:host_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:host_id', self.target))
def test_create_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:profile', self.target))
def test_create_port_with_binding_vnic_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:binding:vnic_type', self.target))
def test_create_port_with_allowed_address_pairs(self):
self.assertRaises(
@ -1294,28 +1294,24 @@ class ServiceRoleTests(PortAPITestCase):
policy.enforce(self.context, 'get_port', self.target))
def test_get_port_binding_vif_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:vif_type',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:vif_type', self.target))
def test_get_port_binding_vif_details(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:vif_details',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:vif_details', self.target))
def test_get_port_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:host_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:host_id', self.target))
def test_get_port_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:profile',
self.target)
self.assertTrue(
policy.enforce(self.context,
'get_port:binding:profile', self.target))
def test_get_port_resource_request(self):
self.assertRaises(
@ -1359,16 +1355,14 @@ class ServiceRoleTests(PortAPITestCase):
self.target))
def test_update_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:host_id', self.target))
def test_update_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:profile', self.target))
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(

View File

@ -262,11 +262,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _service_req(self, method, resource, data=None, fmt=None, id=None,
params=None, action=None, subresource=None, sub_id=None,
ctx=None, headers=None):
ctx=None, headers=None, tenant_id=None):
tenant_id = tenant_id or 'service-project'
req = self._req(method, resource, data, fmt, id, params, action,
subresource, sub_id, ctx, headers)
req.environ['neutron.context'] = context.Context(
'service-user', 'service-project', roles=['service'])
'service-user', tenant_id, roles=['service'])
return req
def _member_req(self, method, resource, data=None, fmt=None, id=None,
@ -291,12 +292,16 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def new_create_request(self, resource, data, fmt=None, id=None,
subresource=None, context=None, tenant_id=None,
as_admin=False):
as_admin=False, as_service=False):
tenant_id = tenant_id or self._tenant_id
if as_admin:
return self._admin_req(
'POST', resource, data, fmt, id=id,
subresource=subresource, ctx=context, tenant_id=tenant_id)
elif as_service:
return self._service_req(
'POST', resource, data, fmt, id=id,
subresource=subresource, ctx=context, tenant_id=tenant_id)
return self._member_req('POST', resource, data, fmt, id=id,
subresource=subresource, ctx=context,
tenant_id=tenant_id)
@ -345,7 +350,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def new_update_request(self, resource, data, id, fmt=None,
subresource=None, context=None, sub_id=None,
headers=None, as_admin=False, tenant_id=None):
headers=None, as_admin=False, as_service=False,
tenant_id=None):
tenant_id = tenant_id or self._tenant_id
if as_admin:
return self._admin_req(
@ -353,6 +359,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
sub_id=sub_id, ctx=context, headers=headers,
tenant_id=tenant_id
)
elif as_service:
return self._service_req(
'PUT', resource, data, fmt, id=id,
subresource=subresource, ctx=context, tenant_id=tenant_id)
return self._member_req(
'PUT', resource, data, fmt, id=id, subresource=subresource,
sub_id=sub_id, ctx=context, headers=headers, tenant_id=tenant_id
@ -407,7 +417,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return req.get_response(self.api)
def _create_bulk(self, fmt, number, resource, data, name='test',
tenant_id=None, as_admin=False, **kwargs):
tenant_id=None, as_admin=False, as_service=False,
**kwargs):
"""Creates a bulk request for any kind of resource."""
tenant_id = tenant_id or self._tenant_id
objects = []
@ -421,7 +432,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
req_data = {collection: objects}
req = self.new_create_request(collection, req_data, fmt,
tenant_id=tenant_id,
as_admin=as_admin)
as_admin=as_admin,
as_service=as_service)
return req.get_response(self.api)
def _create_network(self, fmt, name, admin_state_up,
@ -517,7 +529,7 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return subnetpool_res
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, is_admin=False,
arg_list=None, is_admin=False, is_service=False,
tenant_id=None, **kwargs):
tenant_id = tenant_id or self._tenant_id
data = {'port': {'network_id': net_id,
@ -541,7 +553,8 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
data['port']['device_id'] = device_id
port_req = self.new_create_request('ports', data, fmt,
tenant_id=tenant_id,
as_admin=is_admin)
as_admin=is_admin,
as_service=is_service)
port_res = port_req.get_response(self.api)
if expected_res_status:
@ -564,12 +577,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _create_port_bulk(self, fmt, number, net_id, name,
admin_state_up, tenant_id=None, as_admin=False,
**kwargs):
as_service=False, **kwargs):
base_data = {'port': {'network_id': net_id,
'admin_state_up': admin_state_up}}
return self._create_bulk(fmt, number, 'port', base_data,
tenant_id=tenant_id, as_admin=as_admin,
**kwargs)
as_service=as_service, **kwargs)
def _make_network(self, fmt, name, admin_state_up, as_admin=False,
**kwargs):
@ -732,10 +745,11 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _update(self, resource, id, new_data,
expected_code=webob.exc.HTTPOk.code, headers=None,
request_tenant_id=None, as_admin=False):
request_tenant_id=None, as_admin=False, as_service=False):
req = self.new_update_request(
resource, new_data, id, headers=headers,
tenant_id=request_tenant_id, as_admin=as_admin)
tenant_id=request_tenant_id, as_admin=as_admin,
as_service=as_service)
res = req.get_response(self._api_for_resource(resource))
self.assertEqual(expected_code, res.status_int)
return self.deserialize(self.fmt, res)

View File

@ -4449,7 +4449,7 @@ class TestOVNParentTagPortBinding(OVNMechanismDriverTestCase):
self._create_port(
self.fmt, n['network']['id'],
expected_res_status=404,
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
**binding)
@ -4461,7 +4461,7 @@ class TestOVNParentTagPortBinding(OVNMechanismDriverTestCase):
with self.port(s) as p:
binding[OVN_PROFILE]['parent_name'] = p['port']['id']
res = self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
**binding)
port = self.deserialize(self.fmt, res)
@ -4476,7 +4476,7 @@ class TestOVNParentTagPortBinding(OVNMechanismDriverTestCase):
with self.port(s) as p:
binding[OVN_PROFILE]['parent_name'] = p['port']['id']
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4490,7 +4490,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
res = self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
**binding)
port = self.deserialize(self.fmt, res)
@ -4502,7 +4502,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4512,7 +4512,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4523,7 +4523,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=400,
**binding)
@ -4535,7 +4535,7 @@ class TestOVNVtepPortBinding(OVNMechanismDriverTestCase):
with self.network() as n:
with self.subnet(n):
self._create_port(self.fmt, n['network']['id'],
is_admin=True,
is_service=True,
arg_list=(OVN_PROFILE,),
expected_res_status=404,
**binding)

View File

@ -1522,7 +1522,7 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
portbindings.VNIC_TYPE: 'macvtap',
portbindings.PROFILE: {'bar': 'bar'}}}
res = self._create_port_bulk(self.fmt, 2, net['network']['id'],
'test', True, as_admin=True,
'test', True, as_service=True,
override=overrides)
ports = self.deserialize(self.fmt, res)['ports']
self.assertCountEqual(['direct', 'macvtap'],
@ -2577,7 +2577,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
profile_arg = {portbindings.PROFILE: {'d': s}}
try:
with self.port(expected_res_status=400,
is_admin=True,
is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg):
pass
@ -2587,7 +2587,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_remove_port_binding_profile(self):
profile = {'e': 5}
profile_arg = {portbindings.PROFILE: profile}
with self.port(is_admin=True,
with self.port(is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_port_binding_profile(port['port'], profile)
@ -2595,7 +2595,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
profile_arg = {portbindings.PROFILE: None}
port = self._update('ports', port_id,
{'port': profile_arg},
as_admin=True)['port']
as_service=True)['port']
self._check_port_binding_profile(port)
port = self._show('ports', port_id, as_admin=True)['port']
self._check_port_binding_profile(port)
@ -2790,7 +2790,7 @@ class TestMl2PortBinding(Ml2PluginV2TestCase,
def test_port_binding_profile_not_changed(self):
profile = {'e': 5}
profile_arg = {portbindings.PROFILE: profile}
with self.port(is_admin=True,
with self.port(is_service=True,
arg_list=(portbindings.PROFILE,),
**profile_arg) as port:
self._check_port_binding_profile(port['port'], profile)

View File

@ -716,7 +716,7 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
mechanism_test.TestMechanismDriver, '_check_port_context'
):
req = self.new_update_request('ports', update_body, port_id,
as_admin=True)
as_service=True)
self.assertEqual(200, req.get_response(self.api).status_int)
def test_bind_non_pf_port_with_mac_port_not_updated(self):
@ -857,7 +857,7 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
mechanism_test.TestMechanismDriver, '_check_port_context'
):
req = self.new_update_request('ports', update_body, port['id'],
as_admin=True)
as_service=True)
self.assertEqual(200, req.get_response(self.api).status_int)
# Neutron expected to reset the MAC to a generated one so that the

View File

@ -1379,7 +1379,7 @@ class L3DvrSchedulerTestCase(L3SchedulerBaseMixin,
subnet_ids = []
subnet_ids.append(subnet['subnet']['id'])
with self.port(subnet=subnet,
is_admin=True,
is_service=True,
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=('admin_state_up',
portbindings.PROFILE,), **host_args):