openstack Neutron源码分析(三)------linuxbridge-agent
直接从源码开始分析neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py:common_config.init(sys.argv[1:])common_config.setup_logging()try:interface_mappings = n_
直接从源码开始分析
neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py:
common_config.init(sys.argv[1:]) common_config.setup_logging() try: interface_mappings = n_utils.parse_mappings( cfg.CONF.LINUX_BRIDGE.physical_interface_mappings) except ValueError as e: LOG.error(_LE("Parsing physical_interface_mappings failed: %s. " "Agent terminated!"), e) sys.exit(1) LOG.info(_LI("Interface mappings: %s"), interface_mappings) try: bridge_mappings = n_utils.parse_mappings( cfg.CONF.LINUX_BRIDGE.bridge_mappings) except ValueError as e: LOG.error(_LE("Parsing bridge_mappings failed: %s. " "Agent terminated!"), e) sys.exit(1) LOG.info(_LI("Bridge mappings: %s"), bridge_mappings)
首先是从配置文件"/etc/neutron/plugins/ml2/linuxbridge_agent.ini"中解析"#physical_interface_mappings ="和"#bridge_mappings ="两个配置项。
然后用这2个配置初始化”LinuxBridgeManager"对象:
manager = LinuxBridgeManager(bridge_mappings, interface_mappings)
neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_neutron_agent.py:
class LinuxBridgeManager(amb.CommonAgentManagerBase):
def __init__(self, bridge_mappings, interface_mappings):
super(LinuxBridgeManager, self).__init__()
self.bridge_mappings = bridge_mappings
self.interface_mappings = interface_mappings
self.validate_interface_mappings()
self.validate_bridge_mappings()
self.ip = ip_lib.IPWrapper()
# VXLAN related parameters:
self.local_ip = cfg.CONF.VXLAN.local_ip
self.vxlan_mode = lconst.VXLAN_NONE
if cfg.CONF.VXLAN.enable_vxlan:
device = self.get_local_ip_device()
self.validate_vxlan_group_with_local_ip()
self.local_int = device.name
self.check_vxlan_support()
可以看到LinuxBridgeManager继承自CommonAgentManagerBase,这个类是一个抽象类,定义了子类需要实现的方法:
neutron/plugins/ml2/drivers/linuxbridge/agent/_agent_manager_base.py:
@six.add_metaclass(abc.ABCMeta)
class CommonAgentManagerBase(object):
@abc.abstractmethod
def ensure_port_admin_state(self, device, admin_state_up):
@abc.abstractmethod
def get_agent_configurations(self):
@abc.abstractmethod
def get_agent_id(self):
@abc.abstractmethod
def get_all_devices(self):
@abc.abstractmethod
def get_devices_modified_timestamps(self, devices):
@abc.abstractmethod
def get_extension_driver_type(self):
@abc.abstractmethod
def get_rpc_callbacks(self, context, agent, sg_agent):
@abc.abstractmethod
def get_rpc_consumers(self):
@abc.abstractmethod
def plug_interface(self, network_id, network_segment, device,
device_owner):
@abc.abstractmethod
def setup_arp_spoofing_protection(self, device, device_details):
@abc.abstractmethod
def delete_arp_spoofing_protection(self, devices):
@abc.abstractmethod
def delete_unreferenced_arp_protection(self, current_devices):
self.bridge_mappings = bridge_mappings
self.interface_mappings = interface_mappings
self.validate_interface_mappings()
self.validate_bridge_mappings()
self.ip = ip_lib.IPWrapper()
# VXLAN related parameters:
self.local_ip = cfg.CONF.VXLAN.local_ip
self.vxlan_mode = lconst.VXLAN_NONE
if cfg.CONF.VXLAN.enable_vxlan:
device = self.get_local_ip_device()
self.validate_vxlan_group_with_local_ip()
self.local_int = device.name
self.check_vxlan_support()
接下来会检验配置文件中2个映射的合法性,然后根据是否配置了vxlan进行相应的检查。
初始化完了LinuxBridgeManager对象后,会进行linuxbridge的能力注册:
linuxbridge_capabilities.register()
这个代码的功能其实就是注册回调函数,用于指定类型的agent的相关事件发生时调用注册的函数。其相关机制的实现代码位于neutron/callbacks目录下。
这里注册的就是'Linux bridge agent'初始完成后"after_init"调用"neutron/services/trunk/drivers/linuxbridge/agent/driver.py"中的"init_handler"函数。
具体的注册代码如下:
neutron/plugins/ml2/drivers/linuxbridge/agent/linuxbridge_capabilities.py:
def register():
"""Register Linux Bridge capabilities."""
# Add capabilities to be loaded during agent initialization
capabilities.register(driver.init_handler,
constants.AGENT_TYPE_LINUXBRIDGE)
这样在agent初始化完成后就会调用:
neutron/services/trunk/drivers/linuxbridge/agent/driver.py
def init_handler(resource, event, trigger, agent=None): """Handler for agent init event.""" if agent: LinuxBridgeTrunkDriver()
注册完事件回调后,获取以下2个配置:polling_interval = cfg.CONF.AGENT.polling_interval quitting_rpc_timeout = cfg.CONF.AGENT.quitting_rpc_timeout
然后声明了关键的核心对象CommonAgentLoop:
neutron/plugins/ml2/drivers/agent/_common_agent.py:@profiler.trace_cls("rpc") class CommonAgentLoop(service.Service): def __init__(self, manager, polling_interval, quitting_rpc_timeout, agent_type, agent_binary): """Constructor. :param manager: the manager object containing the impl specifics :param polling_interval: interval (secs) to poll DB. :param quitting_rpc_timeout: timeout in seconds for rpc calls after stop is called. :param agent_type: Specifies the type of the agent :param agent_binary: The agent binary string """ super(CommonAgentLoop, self).__init__() self.mgr = manager self._validate_manager_class() self.polling_interval = polling_interval self.quitting_rpc_timeout = quitting_rpc_timeout self.agent_type = agent_type self.agent_binary = agent_binary
可以看到其实现了service.Service接口,按照前面2节neutron源码的分析,可知它可以被ProcessLauncher启动。可以看到下面确实也是这样:
agent = ca.CommonAgentLoop(manager, polling_interval, quitting_rpc_timeout,
constants.AGENT_TYPE_LINUXBRIDGE,
LB_AGENT_BINARY)
setup_profiler.setup("neutron-linuxbridge-agent", cfg.CONF.host)
LOG.info(_LI("Agent initialized successfully, now running... "))
launcher = service.launch(cfg.CONF, agent)
launcher.wait()
这样具体功能的实现我们只要看CommonAgentLoop的start方法即可:
def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
# stores all configured ports on agent
self.network_ports = collections.defaultdict(list)
# flag to do a sync after revival
self.fullsync = False
self.context = context.get_admin_context_without_session()
self.setup_rpc()
self.init_extension_manager(self.connection)
configurations = {'extensions': self.ext_manager.names()}
configurations.update(self.mgr.get_agent_configurations())
#TODO(mangelajo): optimize resource_versions (see ovs agent)
self.agent_state = {
'binary': self.agent_binary,
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': self.agent_type,
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
capabilities.notify_init_event(self.agent_type, self)
# The initialization is complete; we can start receiving messages
self.connection.consume_in_threads()
self.daemon_loop()
由了前面neutron-server代码的分析,分析linuxbridge的代码也不难。
首先是安装rpc服务:
def start(self):
self.prevent_arp_spoofing = cfg.CONF.AGENT.prevent_arp_spoofing
# stores all configured ports on agent
self.network_ports = collections.defaultdict(list)
# flag to do a sync after revival
self.fullsync = False
self.context = context.get_admin_context_without_session()
self.setup_rpc()
def setup_rpc(self):
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(
self.context, self.sg_plugin_rpc, defer_refresh_firewall=True)
self.agent_id = self.mgr.get_agent_id()
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
# RPC network init
# Handle updates from service
self.rpc_callbacks = self.mgr.get_rpc_callbacks(self.context, self,
self.sg_agent)
self.endpoints = [self.rpc_callbacks]
self._validate_rpc_endpoints()
# Define the listening consumers for the agent
consumers = self.mgr.get_rpc_consumers()
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers,
start_listening=False)
首先是声明'plugin_rpc',这个rpc是一个生产者,用于发起rpc调用,使用topic为'q-plugin'的消息。
neutron/agent/rpc.py:
class PluginApi(object):
'''Agent side of the rpc API.
API version history:
1.0 - Initial version.
1.3 - get_device_details rpc signature upgrade to obtain 'host' and
return value to include fixed_ips and device_owner for
the device port
1.4 - tunnel_sync rpc signature upgrade to obtain 'host'
1.5 - Support update_device_list and
get_devices_details_list_and_failed_devices
'''
def __init__(self, topic):
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
接下来声明'sg_plugin_rpc',同上也是一个rpc生产者,用于发起rpc调用。主要是安全组相关的调用。
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
然后用这个sg_plugin_rpc构造了‘SecurityGroupAgentRpc'对象,这样这个对象就可以用sg_plugin_rpc来发起rpc调用了。
neutron/agent/securitygroups_rpc.py:
class SecurityGroupAgentRpc(object):
"""Enables SecurityGroup agent support in agent implementations."""
def __init__(self, context, plugin_rpc, local_vlan_map=None,
defer_refresh_firewall=False, integration_bridge=None):
self.context = context
self.plugin_rpc = plugin_rpc
self.init_firewall(defer_refresh_firewall, integration_bridge)
可以看到将sg_plugin_rpc赋给了self.plugin_rpc,后面会用它对neutron-server发起rpc调用。然后在init_firewall函数中根据配置的firewall_driver初始化加载并初始化对应的对象:
def init_firewall(self, defer_refresh_firewall=False,
integration_bridge=None):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver or 'noop'
LOG.debug("Init firewall settings (driver=%s)", firewall_driver)
if not _is_valid_driver_combination():
LOG.warning(_LW("Driver configuration doesn't match "
"with enable_security_group"))
firewall_class = firewall.load_firewall_driver_class(firewall_driver)
try:
self.firewall = firewall_class(
integration_bridge=integration_bridge)
except TypeError:
self.firewall = firewall_class()
接着往下看setup_rpc函数:
self.agent_id = self.mgr.get_agent_id()
LOG.info(_LI("RPC agent_id: %s"), self.agent_id)
调用mgr的get_agent_id()函数获取agent_id,这个mgr就是前面main函数里构造的LinuxBridgeManager.get_agent_id()根据配置的bridge_mapping,获取对应网卡的mac地址得到,这样保证了不同机器上linuxbridge-agent id的唯一性.
def get_agent_id(self):
if self.bridge_mappings:
mac = utils.get_interface_mac(
list(self.bridge_mappings.values())[0])
else:
devices = ip_lib.IPWrapper().get_devices(True)
if devices:
mac = utils.get_interface_mac(devices[0].name)
else:
LOG.error(_LE("Unable to obtain MAC address for unique ID. "
"Agent terminated!"))
sys.exit(1)
return 'lb%s' % mac.replace(":", "")
然后声明了一个state_rpc对象,linuxbridge-agent用它来向neutron-server定时(默认30s)发送状态报告,这样neutron-server就知道哪些linuxbridge-agent处于活跃状态。
self.topic = topics.AGENT
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
然后是获取rpc_cablbacks对象,并将其作为rpc server的endpoints,关于endpoints前面章节有详细讲解:http://blog.csdn.net/happyanger6/article/details/54777429:
self.rpc_callbacks = self.mgr.get_rpc_callbacks(self.context, self,
self.sg_agent)
self.endpoints = [self.rpc_callbacks]
最后获取所有要消费的topic,并创建消费者开始监听处理rpc调用:
consumers = self.mgr.get_rpc_consumers()
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers,
start_listening=False)
其中get_rpc_consumers()用于获取所有的消费topic:
def get_rpc_consumers(self):
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[topics.NETWORK, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
if cfg.CONF.VXLAN.l2_population:
consumers.append([topics.L2POPULATION, topics.UPDATE])
return consumers
接下来是启动定时任务来向neutron-server定时汇报agent的状态,可以看到汇报的状态中都包含了哪些消息:
#TODO(mangelajo): optimize resource_versions (see ovs agent)
self.agent_state = {
'binary': self.agent_binary,
'host': cfg.CONF.host,
'topic': constants.L2_AGENT_TOPIC,
'configurations': configurations,
'agent_type': self.agent_type,
'resource_versions': resources.LOCAL_RESOURCE_VERSIONS,
'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
上面这句就是前面提到的neutron提供的callbacks机制,这里会调用前面注册的事件回调函数"init_handler"capabilities.notify_init_event(self.agent_type, self)
然后开始接收rpc消息:# The initialization is complete; we can start receiving messages self.connection.consume_in_threads()
更多推荐
所有评论(0)