在物联网与工业互联网场景中,海量时序数据的实时流转是业务系统的核心需求。时序数据库不仅需要高效存储,还必须提供灵活的数据订阅与推送能力。TDengine 内置了类消息队列的数据订阅机制,无需额外部署 Kafka、RabbitMQ 等中间件,即可实现毫秒级实时数据推送,大幅降低系统架构复杂度与运维成本。本文将从主题创建、消费组管理、进度控制到典型应用场景,全面解析 TDengine 时序数据库的数据订阅与实时推送实践。
一、内置类消息队列:为什么不再需要 Kafka
传统架构中,时序数据从写入到消费往往需要经过独立的消息队列中间件,数据链路长、运维成本高。TDengine 时序数据库将消息队列能力直接内置于数据库引擎中,形成”写入即订阅”的极简架构。
1.1 核心优势
- 架构简化:无需部署和维护独立的 Kafka、Pulsar 等消息中间件,减少组件数量和运维负担
- 零数据拷贝:订阅系统直接基于 WAL(Write-Ahead Log)读取数据,避免了数据从数据库到消息队列的二次传输
- 毫秒级延迟:数据写入完成后立即推送给消费者,端到端延迟可控制在毫秒级别
- SQL 预处理:支持在数据库端完成数据过滤与计算,仅将结果集推送给消费者,大幅减少网络传输量
1.2 与 Kafka 的对比
| 特性 | TDengine 数据订阅 | Kafka |
|---|---|---|
| 部署方式 | 内置,无需额外组件 | 独立集群部署 |
| 数据存储 | 与时序数据共享存储 | 独立日志存储 |
| 数据过滤 | 支持 SQL 预处理 | 需要流处理框架 |
| 消费模型 | 推拉结合 | 纯拉取模式 |
| 运维成本 | 低(统一运维) | 高(独立运维) |
二、主题创建:灵活定义数据来源
主题(Topic)是 TDengine 数据订阅的核心抽象,定义了数据的来源和范围。TDengine 支持三种粒度的主题创建方式,满足不同场景的数据消费需求。
2.1 按数据库创建主题
订阅整个数据库中的所有表数据,适用于需要全量数据监控的场景:
-- 创建数据库级别的主题
CREATE TOPIC topic_db AS DATABASE power;
该主题会自动覆盖 power 数据库中所有表的数据变更,包括新增子表的数据。
2.2 按超级表创建主题
订阅指定超级表的数据,适合按业务模块划分数据消费范围:
-- 创建超级表级别的主题
CREATE TOPIC topic_meters AS STABLE meters;
该主题仅包含 meters 超级表及其所有子表的数据,粒度更精确。
2.3 按 SQL 查询创建主题
基于自定义 SQL 查询创建主题,提供最灵活的数据筛选能力,是 TDengine 时序数据库区别于传统消息队列的核心特性之一:
-- 创建 SQL 查询主题,仅订阅电压超过 220V 的数据
CREATE TOPIC topic_high_voltage AS
SELECT ts, location, voltage, current
FROM meters
WHERE voltage > 220;
-- 创建聚合主题,订阅每分钟的平均电流
CREATE TOPIC topic_avg_current AS
SELECT _wstart, location, AVG(current) AS avg_current
FROM meters
INTERVAL(1m);
SQL 主题的优势在于:过滤和聚合计算在数据库服务端完成,消费者只需接收处理后的结果集,网络传输量可降低数倍甚至数十倍。
2.4 主题管理操作
-- 查看所有主题
SHOW TOPICS;
-- 删除主题
DROP TOPIC topic_db;
三、实时推送:数据写入即推送
TDengine 时序数据库采用推拉结合(Push-Pull Hybrid)的数据传输模式,兼顾实时性与资源效率。
3.1 推送机制
当新数据写入 TDengine 时,订阅系统会立即检测到 WAL 中的数据变更,并主动推送给等待中的消费者。整个流程无需消费者轮询,实现真正的毫秒级延迟:
数据写入 → WAL 落盘 → 订阅系统检测 → 主动推送至消费者
3.2 长轮询兜底
当暂时没有新数据时,消费者通过长轮询机制保持与服务端的连接。一旦有新数据到达,服务端立即响应并推送数据,避免了频繁空轮询造成的资源浪费。
3.3 代码示例:消费数据
import taos
# 创建消费者连接
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="earliest"
)
# 循环拉取数据
for message in consumer:
print(f"主题: {message.topic}, 分区: {message.partition}")
for row in message.rows:
print(f" 时间: {row[0]}, 电流: {row[1]}, 电压: {row[2]}")
# 处理完成后提交进度
message.commit()
consumer.close()
四、消费组管理:共享进度与自动负载均衡
消费组(Consumer Group)是 TDengine 数据订阅实现高可靠、高吞吐消费的关键机制。
4.1 共享消费进度
同一消费组内的所有消费者共享统一的消费进度。这意味着一条数据只会被消费组内的一个消费者处理,避免了数据重复消费的问题。消费进度由服务端(mnode)集中管理,持久化存储,消费者重启后可从断点继续消费。
4.2 自动负载均衡(Rebalance)
TDengine 时序数据库内置了自动 Rebalance 机制,当消费组成员发生变化时,系统会自动重新分配数据分区:
- 新消费者加入:消费组新增成员时,自动将部分 vnode 分配给新成员
- 消费者离开:某个消费者断开连接或主动退出时,其负责的分区自动分配给其他成员
- 故障转移:消费者故障时,系统在数秒内完成分区迁移,保证消费连续性
系统每 2 秒检测一次消费组状态,发现变化时自动触发 Rebalance,以 vnode 为最小分配单元,采用均匀分配策略,并优先保持已有分配关系以减少不必要的数据迁移。
4.3 代码示例:多消费者协同
import taos
# 消费者 A
consumer_a = taos.Consumer(
group_id="g1", # 同一消费组
topics=["topic_meters"],
auto_offset="latest"
)
# 消费者 B(另一进程或机器)
consumer_b = taos.Consumer(
group_id="g1", # 同一消费组
topics=["topic_meters"],
auto_offset="latest"
)
# 两个消费者自动分配不同的 vnode 分区,并行消费
for message in consumer_a:
process(message)
message.commit()
五、进度管理:精确控制消费位置
消费进度管理是确保数据不丢失、不重复的关键。TDengine 时序数据库通过 WAL 版本号精确记录每个 vnode 的消费位置。
5.1 Commit Offset
消费者处理完数据后,需要提交消费进度(Commit Offset),告知服务端已成功处理到哪个位置:
# 方式一:手动提交(推荐)
for message in consumer:
process(message)
message.commit() # 显式提交
# 方式二:批量提交
messages = []
for message in consumer:
messages.append(message)
if len(messages) >= 100:
process_batch(messages)
for msg in messages:
msg.commit()
messages.clear()
5.2 earliest 与 latest 配置
通过 auto.offset 参数控制消费者首次启动时的消费起始位置:
| 配置值 | 行为 | 适用场景 |
|---|---|---|
earliest | 从最早可用的数据开始消费 | 需要全量历史数据处理 |
latest | 仅从最新数据开始消费 | 仅关注实时数据 |
none | 无已提交进度时抛出异常 | 必须保证进度连续性 |
# 从最早数据开始消费(首次启动时回溯全部历史数据)
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="earliest"
)
# 仅消费实时数据(忽略历史数据)
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="latest"
)
六、兼容 Kafka 风格 API:降低迁移成本
对于已经使用 Kafka 的团队,TDengine 提供了兼容 Kafka 风格的 API 接口,大幅降低迁移成本。开发者无需学习全新的 API 体系,只需调整连接配置即可完成切换。
6.1 API 对照
| Kafka 概念 | TDengine 对应概念 |
|---|---|
| Topic | Topic(主题) |
| Consumer Group | Consumer Group(消费组) |
| Partition | VNode(虚拟数据节点) |
| Offset | Commit Offset(消费进度) |
| poll() | poll()(拉取数据) |
| commitSync() | commit()(提交进度) |
6.2 迁移示例
# Kafka 风格的消费者代码(TDengine)
from taos import Consumer
consumer = Consumer({
"group.id": "g1",
"td.connect.ip": "127.0.0.1",
"td.connect.port": "6030",
"auto.offset.reset": "earliest"
})
consumer.subscribe(["topic_meters"])
while True:
records = consumer.poll(timeout=1000)
for record in records:
handle(record)
consumer.unsubscribe()
consumer.close()
API 的使用方式与 Kafka 高度一致,包括 subscribe()、poll()、commit()、unsubscribe() 等核心方法,开发者可以快速上手。
七、SQL 预处理:在数据库端完成过滤
SQL 预处理是 TDengine 数据订阅区别于传统消息队列的核心优势。通过在数据库服务端执行过滤、聚合等计算,仅将结果集推送给消费者,可以大幅减少网络传输量和客户端计算压力。
7.1 数据过滤
-- 仅订阅特定设备组的数据
CREATE TOPIC topic_group1 AS
SELECT ts, current, voltage
FROM meters
WHERE groupId = 1;
-- 仅订阅异常数据
CREATE TOPIC topic_alert AS
SELECT ts, location, temperature
FROM sensors
WHERE temperature > 80 OR humidity > 95;
7.2 数据聚合
-- 订阅每 5 分钟的统计指标
CREATE TOPIC topic_stats AS
SELECT _wstart, _wend, location,
AVG(current) AS avg_current,
MAX(voltage) AS max_voltage,
COUNT(*) AS sample_count
FROM meters
WHERE voltage > 200
INTERVAL(5m);
通过 SQL 预处理,原始数据量可能达到每秒百万条级别,但推送给消费者的聚合结果可能仅有每秒数百条,传输量降低上千倍。
八、典型应用场景
8.1 实时监控大屏
工业场景中,监控大屏需要实时展示设备运行状态。通过 TDengine 数据订阅,可以将关键指标实时推送至前端:
consumer = taos.Consumer(
group_id="dashboard_group",
topics=["topic_realtime_stats"],
auto_offset="latest"
)
for message in consumer:
# 将数据推送到 WebSocket 服务
websocket_server.broadcast(message.rows)
message.commit()
8.2 跨系统数据同步
将 TDengine 中的时序数据实时同步到 Elasticsearch、ClickHouse 等其他系统,用于全文检索或离线分析:
consumer = taos.Consumer(
group_id="sync_group",
topics=["topic_meters"],
auto_offset="earliest"
)
for message in consumer:
# 批量写入 Elasticsearch
es_client.bulk_index(message.rows)
message.commit()
8.3 告警通知系统
订阅关键传感器数据,实时检测异常并触发告警通知:
-- 创建告警主题
CREATE TOPIC topic_alert AS
SELECT ts, device_id, temperature, pressure
FROM sensors
WHERE temperature > 100 OR pressure < 0.5;
consumer = taos.Consumer(
group_id="alert_group",
topics=["topic_alert"],
auto_offset="latest"
)
for message in consumer:
for row in message.rows:
send_alert(
device=row["device_id"],
metric="temperature",
value=row["temperature"],
channel=["sms", "email", "dingtalk"]
)
message.commit()
九、总结
TDengine 时序数据库通过内置类消息队列的数据订阅机制,为开发者提供了从数据写入到实时消费的一站式解决方案。主题创建的灵活粒度、消费组的自动负载均衡、精确的进度管理、兼容 Kafka 的 API 设计以及强大的 SQL 预处理能力,使得 TDengine 在实时监控、数据同步、告警通知等场景中展现出显著优势。
相比传统”时序数据库 + 消息队列”的复杂架构,TDengine 的数据订阅功能不仅简化了系统架构、降低了运维成本,更通过零拷贝和推拉结合的传输模式实现了毫秒级的数据推送延迟。如果您正在寻找一款既能高效存储时序数据,又能提供实时数据分发能力的时序数据库,TDengine 无疑是值得深入评估的选择。欢迎访问 TDengine 官方文档中心,获取更多技术细节与最佳实践。
























