TDengine在WebRTC日志上报中的实践

小T导读:天润融通是一家云呼叫中心服务商,其中CTI-Cloud为大量头部客户提供高效、稳定的呼叫中心服务。现在,天润通过T-Phone SDK将CTI-Cloud的功能延伸到移动端,为客户提供移动端的呼叫服务。

应用场景

在天润的T-Phone SDK中,我们需要采集WebRTC信息来进行数据的分析并作出优化的建议,所以需要将SDK中采集到的相关日志进行上报;为了精简日志上报的数据,我们只针对其中的传输数据每隔5秒在双方接通后上传,针对传输中网络的抖动和链接状态可以数据化展示,提供对每一通话的数据分析,以便在后续SDK演进中提供数据支撑;另外我们对每一通电话做了操作日志,记录了接口被调用的操作和时间,为用户在某一通电话的操作记录做还原,分析可能的误操作等,为客户提供更好的交互体验。

因为现在仍处于项目初期,我们更关心用户在某一个时间段内的使用情况,在大量使用的场景中是否仍然能保证较高的通话质量,同时我们应该尽可能做到对每个座席都可以进行分析,做到每一个座席都应该有自己的数据表。

举个例子:如果我们要查询企业7000001的座席9001 2020年2月14日12:00-12:10分的一通通话的WebRTC日志,如果没有按照座席进行分表,SQL语句应该是这样:

select log_time, audio_bytes_sent,...  from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;

如果要提升查询的速度,我们首先要对device_idlog_time字段建立索引,但是当数据量比较大的时候,索引的存储也会是问题,所以要考虑分表(我们之前使用的数据库是aws的rds,所以没有分库的概念)

分表的选择有两种,按照时间分表或者按照座席分表。为什么我们要按照座席分表?如果按照时间分表,这样就会出现不同表的数据量差异过大,甚至存在某个表里没有数据的情况,因为很少有人半夜做外呼。但是我们也不能这样武断的不为半夜的时间段建立表,万一人打的是国际长途呢?但是一个座席不可能存在不外呼的情况,而且对于移动端的应用,我们在排查问题时更多是通过某个座席向我们反馈发生的问题,我们再针对这个座席进行排查,所以在查询的时候device_id这个字段是必须要体现的,如果按照device_id进行分表,我们在查询的时候就不再需要对这个字段建立索引了。因而选择按照座席进行分表。

如果要使用传统的数据库做分表,我们在插入数据之前一定要先判断这张表是否存在,同时我们还需要提前创建好这些表。这种步骤在我看来就显得很鸡肋。如果能有数据库可以做到在插入数据时指定表名,如果存在则插入,如果不存在则自动创建表,这样就方便多了。

日志上报的整体处理流程

整个流程需要T-Phone SDK,CTI-Cloud的Interface模块(CTI-Cloud对客户开放的接口)和日志上报模块相互协作

TDengine在WebRTC日志上报中的实践 - TDengine Database 时序数据库

设计

考虑到日志上报的频率较高,对IO吞吐的要求比较高。我们可以通过全异步的方式进行数据的采集。这次使用了Vert.x作为全异步项目开发的工具。

在数据存储上基于以下几点考虑我们选择了TDengine Database

1. 不管是WebRTC日志还是操作日志,都是按照时间产生的数据流。而TDengine正好是一个专门为物联网结构化数据流设计的时序数据库

2. WebRTC日志和操作日志存储的数据格式都是一致的,但是如果要做到都每个使用的座席都可以进行分析,最好的方式是每个座席都能有一张自己的数据表。TDengine提供了超级表,在超级表中定义数据结构,并按照tag区分,只要在插入数据时指定表名即可做到分表。显然解决了上述的鸡肋问题。按照TDengine官网上的介绍:

为充分利用其数据的时序性和其他数据特点,TDengine要求对每个数据采集点单独建表。

其实我们的座席就相当于是一个独立的数据采集点,TDengine在我们的场景中是很贴合业务的。

3. 时间。时间也是我们在查询中重点关注的部分,在传统的数据库中,我们需要通过对字段建立索引来提升查询速度,可是我们仍然不想建立索引,因为索引仍需要占用存储空间,我们是否可以通过类似分表的方式来取代索引呢?答案是肯定的:

TDengine中写入的数据在硬盘上是按时间维度进行分片的。同一个vnode中的表在同一时间范围内的数据都存放在同一文件组中。这一数据分片方式可以大大简化数据在时间维度的查询,提高查询速度。在默认配置下,硬盘上的每个数据文件存放10天数据。用户可根据需要修改系统配置参数daysPerFile进行个性化配置。

4. 插入和查询的速度要快,稳定。

在我们的开发服务器上尝试了一下TDengine Database。和官网上介绍的出入不大,查询和存储速度确实很快,而且也不依赖其他文件系统,所以就使用TDengine作为这个模块的存储引擎。由于TDengine中对列有长度限制,最长4096,而且我们上报的字段比较多,所以尽量分配好每个字段的长度。

在数据的采集过程中,TPhone SDK不会直接和我们进行数据交互,而是会先将数据存储到SQS中,我们再从SQS中拉取数据,然后对数据处理后进行存储。

先来创建一个超级表,tdengine提供的超级表在我看来还是很方便的,我们可以直接利用超级表来做到自动的对数据进行分表存储。

create database aladdin;

use aladdin;

create table webrtc_log(
 createTime timestamp,
 deviceId binary(100),
 audioBytesSent bigint,
 audioBytesReceived bigint,
 ...
 ssrcSendGoogCurrentDelayMs int,
 ssrcSendGoogJitterBufferMs int
) tags (
  deviceIdTag binary(100)
);

TDengine提供了非常多的连接方式,为了更好的配合Vertx进行异步存储,我们在这里使用了Rest方式进行数据库操作。

开始

在有了整体思路之后我们开始上手开发:

1. 应用配置:

{
  "aws.region": "<your aws region>",
  "aws.accessKey": "<your aws ak>",
  "aws.secretAccessKey": "<your aws sk>",
  "aladdin.maxPool": 100,
  "aladdin.maxWaitQueue": 1500,
  "aladdin.queue.name": ["queuename1","queuename2"],
  "aladdin.cache.expireAfterWrite": 30,
  "aladdin.cache.expireAfterAccess": 30,
  "tdengine.host": "<your tdengine host>",
  "tdengine.port": 6020,
  "tdengine.user": "root",
  "tdengine.password": "<your tdengine password>"
}

2. 重写Launcher

import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author qianwj
 * @since v1.0
 */
public class AladdinLauncher extends Launcher {

  private static Configurer configurer = new Configurer();

  private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class);

  public static void main(String[] args) {
    System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());
    new AladdinLauncher().dispatch(args);
  }

  @Override
  public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
    logger.info("Loading config starting...");
    JsonObject config = configurer.load();
    JsonObject local = deploymentOptions.getConfig();
    if (!config.isEmpty()) { // 将consul配置注入到context中
      local.mergeIn(config);
      deploymentOptions.setConfig(local);
    }
    super.beforeDeployingVerticle(deploymentOptions);
    logger.info("Loading config completed, config: {}", deploymentOptions.getConfig());
  }

  @Override
  public void afterConfigParsed(JsonObject config) {
    logger.info("Loading local config complete, local config: {}", config.encodePrettily());
  }

  @Override
  public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
    logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause);
    vertx.close();
  }
}

其实写完第二步就可以知道这个配置文件存在不是必要的,我们使用了Consul作为配置中心来进行集中配置,这一步主要是为了注入consul的配置以及加载日志。

3. 拉取SQS中的数据

import com.amazonaws.AmazonServiceException;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.service.AwsSQSService;
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class DataCollectVerticle extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class);

  private volatile boolean shutdown = false;

  @Override
  public void start() throws Exception {
    logger.info("DataCollectVerticle starting...");
    AwsSQSService sqsService = Configurer.sqsService();
    EventBus bus = vertx.eventBus();
    vertx.setPeriodic(1000, id -> {
      try {
        if (shutdown) {
          vertx.cancelTimer(id);
        }
        JsonArray array = config().getJsonArray(Configurer.QUEUE_URL);
        List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString(0));
        List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1));
        bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs));
      } catch (AmazonServiceException e) {
        logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e);
      }
    });
  }



  @Override
  public void stop() throws Exception {
    shutdown = true;
    logger.info("DataCollectVerticle closing...");
  }
}

4. 将数据存储到TDengine中

import com.github.benmanes.caffeine.cache.Cache;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.DataOperator;
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.model.WebRTCLog;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class SaveVerticle extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(SaveVerticle.class);

  @Override
  public void start() throws Exception {
    logger.info("SaveVerticle starting....");

    // 从event bus接收数据
    EventBus bus = vertx.eventBus();
    bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> {
      JsonArray coming = new JsonArray(msg.body());
      if (coming != null)
        save(coming);
    });
  }

  private void save(JsonArray array) {
    WebClient client = Configurer.tdClient();
    List<WebRTCLog> data = new ArrayList<>();
    Cache<String, WebRTCLog> cache = Configurer.cache();
    if (array.size() > 0) {
      final WebRTCLog empty = new WebRTCLog();
      for (int i = 0; i < array.size(); i++) {
        String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody();
        try {
          JsonObject json = DataOperator.toJsonObject(message);
          WebRTCLog log = json.mapTo(WebRTCLog.class);
          String cacheKey = log.getDeviceId();
          WebRTCLog org = cache.get(cacheKey, k -> empty);
          if (!Objects.equals(org, empty)) { // 如果不是第一次插入
            DataOperator.merge(log, org);
          }
          cache.put(cacheKey, log);
          data.add(log);
        } catch (Exception e) {
          logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e);
        }
      }
      client.post("/rest/sql")
        .basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password"))
        .sendBuffer(insert(data), ar -> {
            if (ar.succeeded()) {
                HttpResponse<Buffer> response = ar.result();
                if (response != null) {
                    JsonObject res = response.bodyAsJsonObject();
                    if (!"succ".equals(res.getString("status"))) {
                        logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc"));
                    }
                }
            } else {
                logger.error("data insert failed! {}", Json.encode(data), ar.cause());
            }
        });
    }
  }

  private Buffer insert(WebRTCLog log) throws Exception {
    String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " +
                       "  USING ALADDIN.WEBRTC_LOG TAGS(%s) " +
                       "VALUES(%s)";
    String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log));
    return Buffer.buffer(sql);
  }

  private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException {
    StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");
    String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) ";
    for (WebRTCLog log : data) {
      sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)));
    }
    return Buffer.buffer(sqlBuilder.toString());
  }

  @Override
  public void stop() throws Exception {
    logger.info("SaveVerticle closing....");
  }
}

5. 部署Verticle

import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.verticle.DataCollectVerticle;
import com.tinet.twatch.aladdin.verticle.SaveVerticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.client.WebClientOptions;

public class MainVerticle extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(MainVerticle.class);

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    logger.info("MainVerticle starting...");
    // 初始化sqs
    String region = config().getString("aws.region");
    String accessKey = config().getString("aws.accessKey");
    String secretKey = config().getString("aws.secretAccessKey");
    Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL));

    DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions();
    dataCollectDeploymentOptions.setInstances(1);
    dataCollectDeploymentOptions.setConfig(config());
    dataCollectDeploymentOptions.setWorker(true);
    Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess"));
    vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> {
      if (ar.succeeded()) {
        logger.info("DataCollectVerticle started!");
      } else {
        logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause());
      }
    });
    // 初始化webclient
    WebClientOptions options = new WebClientOptions();
    options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue"));
    options.setMaxPoolSize(config().getInteger("aladdin.maxPool"));
    options.setDefaultHost(config().getString("tdengine.host"));
    options.setDefaultPort(config().getInteger("tdengine.port"));
    Configurer.initTDClient(vertx, options);
    
    DeploymentOptions saveDeploymentOptions = new DeploymentOptions();
    saveDeploymentOptions.setInstances(1);
    saveDeploymentOptions.setConfig(config());
    vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> {
      if (ar.succeeded()) {
        logger.info("SaveVerticle started!");
      } else {
        logger.warn("SaveVerticle deploy failed!");
      }
    });
  }
}

这样就快速实现了一个日志上报的模块,且多个实例部署时相互之间不会产生影响,当然在实际的生产环境中,我们需要考虑的会更多。

当然,日志上报只是开始。在之后的项目开发中,我还会继续向大家介绍TDengine Database在数据分析中的应用实践,感谢观看。

作者简介:钱文锦 ,天润融通基础研发部研发工程师,开源社区爱好者,目前主要负责天润融通T-Phone SDK/CTI-Cloud相关功能开发和应用。

本文首发于:http://blog.ti-net.com.cn/tdenginezwebrtcrzsbzdsj/?from=groupmessage&isappinstalled=0