Ambari-server-agent交互分析
server-agent交互
一。先介绍核心类
/**
* Monitors the node state and heartbeats.
*/
public class HeartbeatMonitor implements Runnable {
HeartbeatMonitor监控节点状态和心跳。
/**
* HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
*
*/
public class HeartbeatProcessor extends AbstractService{
HeartbeatProcessor后台处理从agent获取的大量数据
/**
* Agent Resource represents Ambari agent controller.
* It provides API for Ambari agents to get the cluster configuration changes
* as well as report the node attributes and state of services running the on
* the cluster nodes
*/
@Path("/")
public class AgentResource {
AgentResource代表ambari agent controllr;为agent提供获得集群配置变更,报告节点属性,
以及节点上运行服务运行状态的api接口。
/**
* Data model for Ambari Agent to send heartbeat to Ambari Server.
*/
public class HeartBeat {
private long responseId = -1;
private long timestamp;
private String hostname;
//命令报告
List<CommandReport> reports = new ArrayList<CommandReport>();
//组件状态
List<ComponentStatus> componentStatus = new ArrayList<ComponentStatus>();
//磁盘状态
private List<DiskInfo> mounts = new ArrayList<DiskInfo>();
//主机状态
HostStatus nodeStatus;
private AgentEnv agentEnv = null;
//告警
private List<Alert> alerts = null;
//恢复报告
private RecoveryReport recoveryReport;
private long recoveryTimestamp = -1;
HeartBeat类为server与agent交互的数据模型。
二。心跳逻辑
2.1 接受心跳流程
AgentResource类的heartbeat方法提供心跳rest-api,agent通过调用该api报告节点属性及服务运行状态。
该接口调用HeartBeatHandler类的handleHeartBeat接口,将心跳信息放到队列heartBeatsQueue中。
放入队列之前,会首先根据服务器保存的currentResponseId和心跳包含的responseId做对比,
如果responseId 等于 currentResponseId - 1,则意味着旧的response丢失,无需处理,
只需要返回上一次的response即可。
如果responseId 不等于 currentResponseId,则意味着顺序错乱,此时需要重启agent。
如果主机状态为等待组件状态更新,则处理状态并进行通知。
放入队列后,还需要把actionQueue中待执行的命令,放到response中,给agent执行。
2.2 处理心跳流程
HeartbeatProcessor定时触发HeartbeatProcessingTask线程,处理心跳信息。
首先处理心跳中的警告信息,通过AlertEventPublisher将告警时间发布出去,
注册了告警时间处理的AlertAggregateListener,AlertReceivedListener,AlertStateChangedListener
对警告分别进行处理。
然后处理状态报告,主要处理组件服务状态,这里还会处理组件的安全状态。
接下来处理命令报告,主要处理命令执行报告,命令较多,逻辑较复杂。
最后处理主机状态。
2.3 命令入队流程
ActionScheduler封装了一个action调度线程,action调度器定期查看行为数据库,决定是否有action可以调度。
如果有,就将命令放到actionQueue中。
附:核心类
public class ActionQueue {
private static HashSet<String> EMPTY_HOST_LIST = new HashSet<String>();
//主机对应命令数组
final ConcurrentMap<String, Queue<AgentCommand>> hostQueues;
HashSet<String> hostsWithPendingTask = new HashSet<String>();
ActionQueue行为队列,
/**
* This class encapsulates the action scheduler thread.
* Action schedule frequently looks at action database and determines if
* there is an action that can be scheduled.
*/
@Singleton
class ActionScheduler implements Runnable {
ActionScheduler封装了一个action调度线程,action调度器定期查看行为数据库,决定是否有action可以调度。
/**
* Execution commands are scheduled by action manager, and these are
* persisted in the database for recovery.
*/
public class ExecutionCommand extends AgentCommand {
命令执行由action管理器调取,这些被持久化在数据库,以供恢复。
/**
* The {@link AlertEventPublisher} is used to wrap a customized instance of an
* {@link AsyncEventBus} that is only used for alerts. In general, Ambari should
* have its own application-wide event bus for application events (session
* information, state changes, etc), but since alerts can contain many events
* being published concurrently, it makes sense to encapsulate a specific alert
* bus in this publisher.
*/
@Singleton
public final class AlertEventPublisher {
/**
* A multi-threaded event bus that can handle dispatching {@link AlertEvent}s.
*/
private final EventBus m_eventBus;
AlertEventPublisher使用了guava库的EventBus类;共有下面三个listener注册。
/**
* The {@link AlertAggregateListener} is used to listen for all incoming
* {@link AlertStateChangeEvent} instances and determine if there exists a
* {@link SourceType#AGGREGATE} alert that needs to run.
* <p/>
* This listener is only needed on state changes as aggregation of alerts is
* only performed against the state of an alert and not the values that
* contributed to that state. However, this listener should only be concerned
* with {@link AlertFirmness#HARD} events as they represent a true change in the
* state of an alert. Calculations should never be performed on
* {@link AlertFirmness#SOFT} alerts since they may be false positives.
*/
@Singleton
@EagerSingleton
public class AlertAggregateListener {
AlertAggregateListener用来监听所有告警状态变更事件AlertStateChangeEvent,
/**
* The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
* and updates the appropriate DAOs. It may also fire new
* {@link AlertStateChangeEvent} when an {@link AlertState} change is detected.
*/
@Singleton
@EagerSingleton
public class AlertReceivedListener {
AlertReceivedListener处理接收事件,并更新对应的数据库。
它也可能会触发新的告警状态变更事件。
/**
* The {@link AlertStateChangedListener} class response to
* {@link AlertStateChangeEvent} and creates {@link AlertNoticeEntity} instances
* in the database.
* <p/>
* {@link AlertNoticeEntity} instances will only be updated if the firmness of
* the alert is {@link AlertFirmness#HARD}. In the case of {@link AlertState#OK}
* (which is always {@link AlertFirmness#HARD}), then the prior alert must be
* {@link AlertFirmness#HARD} for any notifications to be created. This is
* because a SOFT non-OK alert (such as CRITICAL) would not have caused a
* notification, so changing back from this SOFT state should not either.
* <p/>
* This class will not create {@link AlertNoticeEntity}s in the following cases:
* <ul>
* <li>If {@link AlertTargetEntity#isEnabled()} is {@code false}
* <li>If the cluster is upgrading or the upgrade is suspended, only
* {@link Services#AMBARI} alerts will be dispatched.
* </ul>
*/
@Singleton
@EagerSingleton
public class AlertStateChangedListener {
AlertStateChangedListener响应告警状态变更事件AlertStateChangeEvent,
并在数据库中创建AlertNoticeEntity实例。
/**
* The {@link AmbariEventPublisher} is used to publish instances of
* {@link AmbariEvent} to any {@link Subscribe} methods interested. It uses a
* single-threaded {@link AsyncEventBus}.
*/
@Singleton
public class AmbariEventPublisher {
AmbariEventPublisher
更多推荐
所有评论(0)