一。先介绍核心类

/**
 * Monitors the node state and heartbeats.
 */
public class HeartbeatMonitor implements Runnable {

HeartbeatMonitor监控节点状态和心跳。

/**
 * HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
 *
 */
public class HeartbeatProcessor extends AbstractService{

HeartbeatProcessor后台处理从agent获取的大量数据

/**
 * Agent Resource represents Ambari agent controller.
 * It provides API for Ambari agents to get the cluster configuration changes
 * as well as report the node attributes and state of services running the on
 * the cluster nodes
 */
@Path("/")
public class AgentResource {

AgentResource代表ambari agent controllr;为agent提供获得集群配置变更,报告节点属性,
以及节点上运行服务运行状态的api接口。

/**
 * Data model for Ambari Agent to send heartbeat to Ambari Server.
 */
public class HeartBeat {
  private long responseId = -1;
  private long timestamp;
  private String hostname;
  //命令报告
  List<CommandReport> reports = new ArrayList<CommandReport>();
  //组件状态
  List<ComponentStatus> componentStatus = new ArrayList<ComponentStatus>();
  //磁盘状态
  private List<DiskInfo> mounts = new ArrayList<DiskInfo>();
  //主机状态
  HostStatus nodeStatus;
  private AgentEnv agentEnv = null;
  //告警
  private List<Alert> alerts = null;
  //恢复报告
  private RecoveryReport recoveryReport;
  private long recoveryTimestamp = -1;

HeartBeat类为server与agent交互的数据模型。

二。心跳逻辑

2.1 接受心跳流程

AgentResource类的heartbeat方法提供心跳rest-api,agent通过调用该api报告节点属性及服务运行状态。
该接口调用HeartBeatHandler类的handleHeartBeat接口,将心跳信息放到队列heartBeatsQueue中。
放入队列之前,会首先根据服务器保存的currentResponseId和心跳包含的responseId做对比,

如果responseId 等于 currentResponseId - 1,则意味着旧的response丢失,无需处理,
只需要返回上一次的response即可。

如果responseId 不等于 currentResponseId,则意味着顺序错乱,此时需要重启agent。

如果主机状态为等待组件状态更新,则处理状态并进行通知。

放入队列后,还需要把actionQueue中待执行的命令,放到response中,给agent执行。

2.2 处理心跳流程

HeartbeatProcessor定时触发HeartbeatProcessingTask线程,处理心跳信息。
首先处理心跳中的警告信息,通过AlertEventPublisher将告警时间发布出去,
注册了告警时间处理的AlertAggregateListener,AlertReceivedListener,AlertStateChangedListener
对警告分别进行处理。

然后处理状态报告,主要处理组件服务状态,这里还会处理组件的安全状态。

接下来处理命令报告,主要处理命令执行报告,命令较多,逻辑较复杂。

最后处理主机状态。

2.3 命令入队流程

ActionScheduler封装了一个action调度线程,action调度器定期查看行为数据库,决定是否有action可以调度。
如果有,就将命令放到actionQueue中。

附:核心类

public class ActionQueue {

  private static HashSet<String> EMPTY_HOST_LIST = new HashSet<String>();
  //主机对应命令数组
  final ConcurrentMap<String, Queue<AgentCommand>> hostQueues;

  HashSet<String> hostsWithPendingTask = new HashSet<String>();

ActionQueue行为队列,

/**
 * This class encapsulates the action scheduler thread.
 * Action schedule frequently looks at action database and determines if
 * there is an action that can be scheduled.
 */
@Singleton
class ActionScheduler implements Runnable {

ActionScheduler封装了一个action调度线程,action调度器定期查看行为数据库,决定是否有action可以调度。

/**
 * Execution commands are scheduled by action manager, and these are
 * persisted in the database for recovery.
 */
public class ExecutionCommand extends AgentCommand {

命令执行由action管理器调取,这些被持久化在数据库,以供恢复。

/**
 * The {@link AlertEventPublisher} is used to wrap a customized instance of an
 * {@link AsyncEventBus} that is only used for alerts. In general, Ambari should
 * have its own application-wide event bus for application events (session
 * information, state changes, etc), but since alerts can contain many events
 * being published concurrently, it makes sense to encapsulate a specific alert
 * bus in this publisher.
 */
@Singleton
public final class AlertEventPublisher {

  /**
   * A multi-threaded event bus that can handle dispatching {@link AlertEvent}s.
   */
  private final EventBus m_eventBus;

AlertEventPublisher使用了guava库的EventBus类;共有下面三个listener注册。

/**
 * The {@link AlertAggregateListener} is used to listen for all incoming
 * {@link AlertStateChangeEvent} instances and determine if there exists a
 * {@link SourceType#AGGREGATE} alert that needs to run.
 * <p/>
 * This listener is only needed on state changes as aggregation of alerts is
 * only performed against the state of an alert and not the values that
 * contributed to that state. However, this listener should only be concerned
 * with {@link AlertFirmness#HARD} events as they represent a true change in the
 * state of an alert. Calculations should never be performed on
 * {@link AlertFirmness#SOFT} alerts since they may be false positives.
 */
@Singleton
@EagerSingleton
public class AlertAggregateListener {

AlertAggregateListener用来监听所有告警状态变更事件AlertStateChangeEvent,

/**
 * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
 * and updates the appropriate DAOs. It may also fire new
 * {@link AlertStateChangeEvent} when an {@link AlertState} change is detected.
 */
@Singleton
@EagerSingleton
public class AlertReceivedListener {

AlertReceivedListener处理接收事件,并更新对应的数据库。
它也可能会触发新的告警状态变更事件。

/**
 * The {@link AlertStateChangedListener} class response to
 * {@link AlertStateChangeEvent} and creates {@link AlertNoticeEntity} instances
 * in the database.
 * <p/>
 * {@link AlertNoticeEntity} instances will only be updated if the firmness of
 * the alert is {@link AlertFirmness#HARD}. In the case of {@link AlertState#OK}
 * (which is always {@link AlertFirmness#HARD}), then the prior alert must be
 * {@link AlertFirmness#HARD} for any notifications to be created. This is
 * because a SOFT non-OK alert (such as CRITICAL) would not have caused a
 * notification, so changing back from this SOFT state should not either.
 * <p/>
 * This class will not create {@link AlertNoticeEntity}s in the following cases:
 * <ul>
 * <li>If {@link AlertTargetEntity#isEnabled()} is {@code false}
 * <li>If the cluster is upgrading or the upgrade is suspended, only
 * {@link Services#AMBARI} alerts will be dispatched.
 * </ul>
 */
@Singleton
@EagerSingleton
public class AlertStateChangedListener {

AlertStateChangedListener响应告警状态变更事件AlertStateChangeEvent,
并在数据库中创建AlertNoticeEntity实例。

/**
 * The {@link AmbariEventPublisher} is used to publish instances of
 * {@link AmbariEvent} to any {@link Subscribe} methods interested. It uses a
 * single-threaded {@link AsyncEventBus}.
 */
@Singleton
public class AmbariEventPublisher {

AmbariEventPublisher

Logo

Agent 垂直技术社区,欢迎活跃、内容共建,欢迎商务合作。wx: diudiu5555

更多推荐