在分布式系统中,节点间时间同步是业务正常运行的前提。一般自己搭建ntp server来作为时间同步服务器,其他节点可以使用crontab定期通过ntpdate来进行时间同步。

ambari是一套完善的集群管理系统。主要工作方式为:ambari-server作为服务端,管理各个ambari-agent端的信息收集、命令下发等操作,此时便可以通过自定义扩展模块,来实现自定义业务的管理(如ceph集群管理)。

agent端将定期上报心跳和相关状态信息,来向server端知会该节点的存活状态,若一定时间内server端未收到agent端的心跳上报,则可以认为agent服务所运行的节点发生异常,将该节点标记为“心跳丢失”;若心跳上报正常,但是根据脚本检测到某些服务异常,则会上报这些异常信息,并在告警系统中显示,以便于管理人员及时处理。

问题发现

最近测试同学发来一张问题单,测试方法如下所示:

  1. 进入某个agent节点,注释掉crontab中定期时间同步的命令,使该节点无法与ntp服务器同步时间
  2. 修改该agent节点的时间。当前时间为11:12:22,修改为22:22:22。

    此处测试同学是随意修改的,并没有注意修改的时间是早于还是晚于当前时间。这里埋下伏笔(当然也是后面定位问题时发现的)。

  3. 恢复crontab中的注释掉的ntpdate代码,使之能够与ntp server同步时间。
  4. 几分钟后,控制台上抛出节点失去心跳的消息。

问题考虑

由于之前只是基本地了解了一下ambari-server与ambari-agent的工作原理,并未实际地分析相关源代码,可以趁此机会熟悉一下比较关键的agent-server心跳处理流程。

此处考虑需要分析以下几点:

  1. agent端是如何上报心跳的?
  2. agent端上报心跳的频率是多少?如何控制?什么场景下会ambari-agent正常运行,但是无法上报心跳?
  3. (延伸)server端是如何处理agent端上报的心跳的?

问题复现

此处虽然涉及ntp,但可以想到这与ntp并无关系。

问题是通过修改时间来引起的,因此可以想到有两种可能:

  1. 修改后的时间晚于当前时间
  2. 修改后的时间早于当前时间

在本地搭建虚拟机进行复现。

agent端上报心跳的代码为python脚本,不易打断点调试,因此直接查看日志进行分析。
修改agent端日志级别为DEBUG:

vim /etc/ambari-agent/conf/ambari-agent.ini
#修改logger的 INFO 为DEBUG

tailf /var/log/ambari-agent/ambari-agent.log #实时查看日志信息

在集群健康的时候,分别手动设置系统时间早于和晚于当前时间,可以发现前者场景是必现的。

问题分析

此时就需要考虑,什么场景下会卡住无法上报心跳?
首先需要分析原有的心跳上报流程,看是否某个条件不满足,没有触发上报流程。

在分析许久后,发现并没有在代码流程中进行限制不允许在时间早于某个时间、或大于某个时间差时上报心跳。问题一度陷入僵局。
后面考虑从线程级别分析,是否是线程卡死无法上报,在添加了许多日志才确认了这一点。

agent心跳上报流程

主要涉及scheduler.py,Controller.py, HeartbeatHandlers.py, Heartbeat.py。

scheduler/任务调度暂未详细查看(由于此问题不涉及),主要排查了其他几项。基本流程如下:

  1. 心跳上报由单线程进行,每约1s上报一次心跳,每隔若干时间上报一次状态。此处根据打印,为Thread-4。

  2. 主线程为了能够控制心跳上报线程的启停,采取了thread.Event来进行通信heartbeat_wait_event。在Controller的init中注册为heartbeat_stop_callback,实现了一下三个方法:

    heartbeat_stop_callback.set_heartbeat()
    heartbeat_stop_callback.reset_heartbeat()
    heartbeat_stop_callback.wait(timeout,self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
    

    该三个方法分别对应了将event设置为true、清空为false、以及wait。

    在wait方法中传入了一个超时时间timeout=0.9,表示在超时时间内,如果有其他线程set event的flag为true,则返回true;若在超时时间内仍未有其他线程set event的flag为true,则返回false。

    此处wait方法中的第二个参数并未使用。不知道最初想实现什么效果。

    在实际工作时,用于上报心跳的单线程起始于阻塞的线程的返回:heartbeat_wait_event.wait

    该函数可以返回true(意味着在超时时间内有线程将其set,通知其他阻塞的线程恢复运行状态),也可以返回false,意味着超时,所有阻塞的线程将恢复运行状态。

在这里插入图片描述

问题复现时现象

阻塞的线程未能继续运行,一直保持阻塞,导致无法继续上报心跳:
在这里插入图片描述

附录:日志添加

# HeartbeatHandlers.py
def wait(self, timeout1, timeout2=0):
    if self._stop:
      logger.info("Stop event received")
      return 0

    if self.heartbeat_wait_event.wait(timeout=timeout1):
      logger.debug("heartbeat_wait_event.wait return 1,timeout=%s",timeout)
      return 1
    logger.debug("heartbeat_wait_event.wait return -1,timeout=%s",timeout)
    return -1

threading.Event

Event其实就是一个简化版的 Condition。Event没有锁,无法使线程进入同步阻塞状态。

isSet(): 当内置标志为True时返回True。
set(): 将标志设为True,并通知所有处于等待阻塞状态的线程恢复运行状态。
clear(): 将标志设为False。
wait([timeout]): 如果标志为True将立即返回,否则阻塞线程至等待阻塞状态,等待其他线程调用set()。

threading.Event()

import threading
import time

event = threading.Event()
def func():
    print("%s before wait for event,timestamp(%s)" %(  threading.currentThread().getName(),time.time()))
    if event.wait(1):
       print("%s recv event true,timestamp(%s)." %( threading.currentThread().getName(), time.time()))
    else:
       print("%s recv event false,timestamp(%s)." % (threading.currentThread().getName(), time.time()))

event.clear()
for i in range (5):
    print("curThread(%s) ,timestamp(%s).begin to start new thread." % (threading.currentThread().getName(), time.time()))    t1 = threading.Thread(target=func)
    t1.start()
    time.sleep(1)
    print("\n")

#print("Main thread set event")
#event.set()

if __name__ == "__main__":
    func()
    print("ALL_DONE,curThread(%s) ,timestamp(%s)." % (threading.currentThread().getName(), time.time()))

正常情况下:
在这里插入图片描述

在循环执行的过程中修改系统时间小于当前时间,构造线程持续阻塞未能超时继续的场景。发现Thread-2一直未返回:

在这里插入图片描述
手动同步时间后,event超时检测生效,线程由阻塞态变为执行态:

在这里插入图片描述
因此可以说明,该问题为修改系统时间小于当前时间时,线程event超时无法生效导致使线程持续处于阻塞态,无法运行。

问题解决

该问题在修改系统时间早于当前时间是必现的。

ambari用了一个线程上报心跳,用threading.event来做线程间通信,实际上运行过程中都是通过wait(timeout)来由阻塞态变为运行态。

这里应该是修改系统时间导致wait(timeout)的超时机制失效,所以一直卡在阻塞态不能上报心跳。同步时间后,重启一下ambari-agent就可以了。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建,欢迎商务合作。wx: diudiu5555

更多推荐