TAOS-JDBCDriver 中订阅的用途和用法

本文介绍了 TAOS-JDBCDriver 订阅功能的使用场景、使用方法和一些限制。本文的预期读者是基于 TAOS-JDBCDriver 开发各种应用的软件开发人员。

如何使用TDengine中的订阅功能?

TDengine的Java订阅接口,目前是与TAOS-JDBCDriver 的 API 配合使用,相关源码文件也在TDengine JDBC驱动的源码所在目录下:TSDBSubscribe.javaTSDBSubscribeCallBack.java。与订阅相关的方法主要有以下三个,均在TSDBSubscribe.java中定义和实现:

subscribe
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException
订阅一个topic,并设置订阅内容、轮询周期、以及异步调用选择。如果订阅成功,此方法会返回一个long型值(本质是连接的一个指针值,此处用于表征此成功建立的订阅);如果订阅失败,则此方法应直接抛出异常。
topic:订阅的名字。
sql:订阅的内容,用户可以传入一条查询SQL语句,此时订阅关注的内容为此查询结果集中的新增记录。
period:订阅执行时,内部轮询的时间。
callBack:执行异步订阅时的回调,空值表示不使用异步。
public TSDBResultSet consume(long subscription) throws OperationsException, SQLException
获取最新的订阅结果,订阅结果返回为一个TSDBResultSet类的实例。
subscription:subscribe成功后返回的long型值。
public void unsubscribe(long subscription, boolean isKeep) throws SQLException
取消已经建立的订阅,如果失败则直接抛出异常。
subscription:subscribe成功后返回的long型值。
isKeep:是否保持订阅的记录。

下面结合一个示例,介绍下其使用方法。

首先是创建订阅:

/**
 * sync subscribe
 *
 * @param topic
 * @param sql
 * @param restart
 * @param period
 * @return
 * @throws SQLException
 */
public long subscribe(String topic, String sql, boolean restart, int period)


/**
 * async subscribe
 *
 * @param topic
 * @param sql
 * @param restart
 * @param period
 * @param callBack
 * @throws SQLException
 */
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack)
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":" + port + "/" + dbName + "?user=root&password=taosdata"
        , properties);
String rawSql = "select * from devices ;";
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);

TDengine中的订阅既可以是同步的,也可以是异步的。这里,同步的意思是用户程序要直接调用  consume 来拉取数据,而异步则由 API 在内部的另一个线程中调用 consume,然后把拉取到的数据交给回调函数 callback 去处理。

注意,这里没有指定起始时间,所以会读到所有时间的数据。如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:

select * from devices where ts > now - 1d and temperature > 80;

订阅的 topic 实际上是它的名字,因为订阅功能是在客户端 API 中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。

如果名称为 topic 的订阅不存在,参数 restart 没有意义;但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个 topic 时,restart 就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。本例中,如果 restarttrue,用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且 restart false,用户程序就不会读到之前已经读取的数据了。 

subscribe 的最后一个参数是以毫秒为单位的轮询周期(间隔需要大于 1000 )。在同步模式下,如过前后两次调用 consume 的时间间隔小于此时间,consume 会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的最小时间间隔。

当要结束一次数据订阅时,需要调用 unsubscribe:

/**
 * cancel subscribe
 *
 * @param subscription
 * @param isKeep
 * @throws SQLException
 */
 public void unsubscribe(long subscription, boolean isKeep)

其第二个参数,用于决定是否在客户端保留订阅的进度信息,如果大家还记得前面说过“订阅功能是在客户端 API 中实现的”的话,应该可以猜到,如果这个参数是 false,那无论下次调用 subscribe 的时的 restart 参数是什么,订阅都只能重新开始了。另外,进度信息的保存位置是 {DataDir}/subscribe/,这个目录下,每个订阅有一个与其 topic 同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。

限制条件

下面是一些 TDengine 订阅功能的局限,大家需要在使用中注意。

  • 订阅的查询语句只能是 select 语句,只能查询原始数据(不支持聚合函数),只能按时间正序查询数据。
  • 在满足应用需求的情况下,请尽量将轮询周期设置的大一些,否则会对系统性能造成影响。
  • 暂不支持乱序数据,用户程序可能读不到使用 import 方式插入的数据。
  • 如果用户程序异常退出或没有正确调用 unsubscribe,进度信息可能会有错误,这时,后续的同名订阅可能读到之前已经读过的数据。