时序数据流经过Kafka队列时可能产生的乱序原因和解决方法

Kafka作为一个流行的消息队列,以分布式高性能,高可靠性等特点已经在多种场景下广泛使用。在工业互联网、物联网时序数据存储的解决方案中也有大量用到。

但在实际部署过程中,可能会因为配置原因导致经过Kafka的数据在接收方产生乱序,给后续处理环节带来排序等工作,造成不必要的处理开销,降低系统的处理性能和额外排序的工作。

其实可以通过合理的规划设计Kafka的配置和方法来避免消息在通过Kafka后乱序的产生,只需要遵循以下原则即可:

对于需要确保顺序的一条消息流,发送到同一个partition上去

Kafka可以在一个topic下设置多个partition来实现分布式和负载均衡,由同一consumer group下的不同consumer去消费;这样的机制能够支持多线程分布式的处理,带来高性能,但也带来了同一消息流走了不同路径的可能性,如果没有针对性的规划,从架构上就无法保证消息的顺序。如下图所示,对于同一个topic的一条消息流,写入不同的partition,就会产生多条路径。

时序数据流经过Kafka队列时可能产生的乱序原因和解决方法 - TDengine Database 时序数据库

为了确保一条消息流的数据能够严格按照时间顺序被消费,则必须遵循一条路径的原则,这样才能实现FIFO(First In First Out)。

根据Kafka的文档描述,把哪条记录发到哪个partition,是由producer负责:

Producers

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!

可见,Kafka已考虑到了确保消息顺序的需求,提供了接口来实现根据指定的key值发送到同一partition的方法。 可以看看Kafka相关源码:

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
  private val random = new java.util.Random

  def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

从源码上来看,Kafka支持通过Key的hash值对partition的数量求余来实现基于Key的分配partition方法。因此我们只要对不同时序消息流,找到他们不同的key,并且这个key是不会发生变化的,那么就能在发送到Kafka的时候,确保每一条消息流发送到同一个partition,走唯一的路径。因此我们可以通过指定Key的方式,来实现这种严格的时序关系。

具体实现方法

TDengine Database的应用场景下,我们通常会把某一类设备(超级表)划分为一个topic。对于每个设备,会单独建表,一个设备产生的数据,会只放到一张表里。对于设备产生的原始数据,就需要在这个数据中找一个能够代表这个数据的ID,而且不会发生变化的字段,作为Key值,在发送给Kafka时,带上这个Key值。这样就能确保该设备的所有数据流经过Kafka时,走唯一的路径。这个ID或key往往是设备具有唯一性的设备编码,这个编码不仅可以作为Kafka的Key,也可以作为TDengine Database里的表名。

具体实现非常简单,在producer发送数据时,选择一个key,通过KeyedMessage方法生成消息,然后send。以Java为例,其他语言可以从Kafka文档中找到相同功能的接口:

 producer.send(new KeyedMessage<String, String>(topic,key,record))

这个接口,可以让使用者非常方便无需增加代码的情况下来实现指定每个消息流绑定一个partition的结果。用户也可以通过自己实现一个partition的算法,来实现更精准的partition分配控制。具体实现可以参考”kafka 指定partition生产,消费“中的介绍。