周边生态贡献者 + 1,一个 TDengine 的 Python ORM 库—crown

本文介绍了一个用于操作 TDengine 的 Python ORM 库。本文的预期读者是,需要使用 Python 语言操作 TDengine 数据库的开发人员。

项目地址:

https://github.com/machine-w/crown

一、什么是 ORM ?

ORM 就是对象关系映射(Object Relational Mapping),是一种程序设计技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。从效果上说,它其实是创建了一个可在编程语言里使用的“虚拟对象数据库”。简单说来就是,通过建立类与数据库表,对象与数据库数据条目的对应关系。从而可以通过编程语言的数据类型操作数据库。

二、为 TDengine 开发 ORM 库的动因

作为一个使用 Python 作为主力编程语言的开发者,笔者经常要编写操作各种数据库的代码。对于键值对类型(如:Redis)或者文档类型(如:MongoDB)的数据库,Python 生态都提供了很好的第三方连接库。而对于最常使用的关系型数据( 如:MySQL、PostgreSQL),Python 则提供了SQLAlchemy、Peewee 等 ORM 第三方库的解决方案。所以,笔者日常工作中,需要手工拼接 SQL 查询语句的场景非常少 ( 甚至慢慢忘记了这项技能)。

近来,笔者需要带领团队完成一个智能电力系统的项目。在技术选型过程中发现了优秀的物联网大数据平台 TDengine 。经过测试和评估发现,无论从超高性能和稳定性、还是简洁的设计、开源的理念。TDengine 都非常适合作为智能电力系统的基础平台使用。但是,在使用过程中,我们发现了一个比较棘手的问题。那就是:由于 TDengine 诞生不久,相比较其他已经发展很多年的其他数据库平台,周边的相关生态软件还略少一些。特别是,苹果操作系统 OS X 下暂时没有原生连接器可用,写好的程序需要拿到 Linux 上去调试。这对于被“宠坏”的 Python 程序员来讲真得没法适应。而且考虑到笔者团队中其他程序员都习惯了 ORM 的操作方式,对原始 SQL 并不熟悉。所以,笔者意识到:如果使用原生的连接器进行开发,将会遇到很多困难。于是就开始了 TDengine 的开源 ORM 库的开发。一方面,可以帮助团队更高效的完成系统开发工作。另外一方面,也可以为帮助 TDengine 更好的完善生态工具链。

GitHub上的页面

三、如何安装和使用

3.1 简介

  • 需要 Python 3.0版本以上
  • 在 TDengine 2.0.8 版本测试通过
  • 解决 Mac 操作系统下没有原生 Python 连接器的问题
  • 极大的降低了 Python 程序员使用 TDengine 技术门槛
  • 可以方便的将数据转换到 numpy 与 pandas

由于目前 TDengine 没有提供 Mac 操作系统下的原生 client , 为保证库的兼容性,目前 crown 库底层使用的 RESTful 接口进行连接。以后的版本中,笔者将提供在 Windows 和 Linux 下的原生连接器接口可供配置使用。

项目地址:

https://github.com/machine-w/crown

3.2 安装

crown 库像其他 Python 第三方库一样,可以通过 pip ,轻松安装最新版本:

pip install crown

还可以通过git安装,使用方法:

git clone https://github.com/machine-w/crown.gitcd crowmpython setup.py install

3.3 简单使用

1. 连接数据库

使用 crown 连接 TDengine ,只需要提供 taos RESTful 服务的地址、端口号、以及需要操作的数据库名。然后即可使用 TdEngineDatabase 类新建一个数据库对象。以下为连接数据库的例子代码:

from crown import * #导入库
DATABASENAME = 'taos_test'  #数据库名
HOST = 'localhost'
PORT = 6041 
db = TdEngineDatabase(DATABASENAME) # 默认端口 6041,默认用户名:root,默认密码:taosdata
#如不使用默认参数,可以使用下面的方法提供参数
# db = TdEngineDatabase(DATABASENAME,host=HOST,port=PORT,user='yourusername',passwd='yourpassword') 

 # 一般情况我们使用connect方法尝试连接数据库,如果当前数据库不存在,则会自动建库。
db.connect() 
# 连接数据库后,db对象后会自动获取全部数据库信息,以字典的形式保存在属性databases中。
print(db.databases) 

#当然也可以使用手动建库方法建立数据库。
db.create_database(safe=True)   #参数safe为True表示:如果库存在,则跳过建库指令。
#可选字段:建库时配置数据库参数,具体字段含义请参考tdengine文档。
# db.create_database(safe=True,keep= 100,comp=0,replica=1,quorum=2,blocks=115) 

#可以通过调用alter_database方法修改数据库参数。
db.alter_database(keep= 120,comp=1,replica=1,quorum=1,blocks=156) 

#删除当前数据库方法drop_database
db.drop_database(safe=True) #参数safe:如果库不存在,则跳过删库指令。

理论上使用 crown 库操作 TDengine ,所有的数据库操作都无需手工拼装 SQL 语句,但是为了应对比较特殊的应用场景, crown 库也提供了执行原始 SQL 语句的功能。

#通过数据库对象的raw_sql方法直接执行sql语句,语句规则与TDengine restful接口要求一致。
res = db.raw_sql('select c1,c2 from taos_test.member1')
print(res)
print(res.head)
print(res.rowcount) 
#返回的对象为二维数据。res.head属性为数组对象,保存每一行数据的代表的列名。res.rowcount属性保存返回行数。
# res: [[1.2,2.2],[1.3,2.1],[1.5,2.0],[1.6,2.1]]
# res.head: ['c1','c2']
# res.rowcount: 4

2. 建表删表操作

建好数据库对象后,就可以通过为 model 类建立子类的方式定义并新建数据库表(使用方法和 Python 中常用的 ORM 库类似),新建的一个类对应数据库中的一张表,类的一个对象对应表中一条数据。以下示例创建一个简单的数据库表:

# 表模型类继承自Model类,每个模型类对应数据库中的一张表,模型类中定义的每个Field,对应表中的一列,
# 如果不明确定义主键类型字段, 会默认添加一个主键,主键名为 “ts”
class Meter1(Model):
    cur = FloatField()  #如果省略列名参数,则使用属性名作为列名
    curInt = IntegerField(db_column='c2')
    curDouble = DoubleField(db_column='c3')
    desc = BinaryField(db_column='des')
    # custom_ts = PrimaryKeyField() # 如果定义了主键列,则使用主键列名作为主键

    class Meta: # Meta子类中定义模型类的配置信息
        database = db # 指定之前建的数据库对象
        db_table = 'meter1' # 指定表名

crown 支持的字段类型与 TDengine 字段类型的对应关系:

crown 支持的字段类型与 TDengine 字段类型的对应关系

定义好表模型后,即可调用类方法 create_table 进行建表操作:

# create_table运行成功返回True,失败则raise错误
Meter1.create_table(safe=True) #safe:如果表存在,则跳过建表指令
#也可以通过数据库对象的create_table方法进行建表。
# db.create_table(Meter1,safe=True) 

# drop_table方法进行删除表操作,运行成功返回True,失败则raise错误
Meter1.drop_table(safe=True) #safe:如果表不存在,则跳过删表指令
#同样可以通过数据库对象删表,功能同上
# db.drop_table(Meter1,safe=True) 

#table_exists方法查看表是否存在,存在返回True,不存在返回:False
Meter1.table_exists()

3. 数据插入

可以通过新建的数据表类 Meter1 新建数据对象并传入具体字段的数值,然后使用对象的 save 方法插入数据。

也可以直接使用 Meter1 类的类方法 insert 直接插入数据。下面的例子分别演示了这两种方法:

import time
#方法一
for i in range(1,101):
    #使用模型类实例化的每个对象对应数据表中的每一行,可以通过传入属性参数的方式给每一列赋值
    m = Meter1(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1',ts= datetime.datetime.now())
    time.sleep(1)
    #使用对象的save方法将数据存入数据库
    m.save()
print(Meter1.select().count()) # 结果:100

#方法二
for i in range(1,101):
    #也可以直接使用模型类的insert方法插入数据。
    Meter1.insert(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1',ts= datetime.datetime.now() - datetime.timedelta(seconds=(102-i)))
print(Meter1.select().count()) # 结果:101

如果不传入时间属性 ts ,则会以当前时刻为默认值传入

Meter1.insert(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1')
m = Meter1(cur = 1/i,curInt=i,curDouble=1/i+10,desc='g1')

4. 数据查询

crown 提供了丰富的数据查询功能,由于篇幅的原因,这里只介绍笔者项目中比较常用的几种查询。了解更多的查询使用方法,请查看项目文档:

https://github.com/machine-w/crown/blob/main/README.rst

单条数据查询:使用 Meter1 类的 select() 方法可以获取表的查询对象,查询对象的 one 方法可以获取满足条件的第一条数据。

#获取一条数据:使用select()类方法获取查询字段(参数留空表示取全部字段),然后可以链式使用one方法获取第一条数据
res = Meter1.select().one()
print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)

#select函数中可以选择要读取的字段
res = Meter1.select(Meter1.cur,Meter1.desc).one()
print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)

多条数据查询:使用 Meter1 类的 select() 方法可以获取表的查询对象,查询对象的 all 方法可以获取满足条件的全部数据。

#使用select()类方法获取查询字段(参数留空表示取全部字段),然后可以链式使用all方法获取全部数据
res_all = Meter1.select().all()
for res in res_all:
    print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)

#select函数中可以选择要读取的字段
res_all = Meter1.select(Meter1.cur,Meter1.desc).all()
for res in res_all:
    print(res.desc,res.curDouble,res.curInt,res.cur,res.ts)

读取数据导入 numpy 和 pandas :虽然 TDengine 提供了很多聚合和统计函数,但是把时序数据导入 numpy 或 pandas 等数据分析组件中进行处理的情况也是很常见的操作。

下面演示如何通过 crown 把结果数据导入 numpy 和 pandas :

#导入numpy
#通过all_raw函数可以获取二维数组格式的数据查询结果。结果每列代表的标题保存在结果对象的head属性中。
raw_results = Meter1.select(Meter1.cur,Meter1.curInt,Meter1.curDouble).all_raw()
#可以很方便的将结果转换为numpy数组对象
np_data = np.array(raw_results)
print(np_data)
print(raw_results.head)

#导入pandas
raw_results = Meter1.select().all_raw()
#使用以下方法,可以轻松的将数据导入pandas,并且使用时间点作为index,使用返回的数据标题作为列名。
pd_data = pd.DataFrame(raw_results,columns=raw_results.head).set_index('ts')
print(pd_data)

选择列四则运算

#使用select()类方法获取查询字段时,可以返回某列或多列间的值加、减、乘、除、取余计算结果(+ - * / %)
res_all = Meter1.select((Meter1.curDouble+Meter1.cur),Meter1.ts).all()
for res in res_all:
    #返回的结果对象可以用get方法获取原始计算式结果
    print(res.get(Meter1.curDouble+Meter1.cur),res.ts) 

#字段别名
#给运算式起别名(不仅运算式,其他放在select函数中的任何属性都可以使用别名)
res_all = Meter1.select(((Meter1.curDouble+Meter1.cur)*Meter1.curDouble).alias('new_name'),Meter1.ts).all() 
for res in res_all:
    #使用别名获取运算结果
    print(res.new_name,res.ts)

where函数

#可以在select函数后链式调用where函数进行条件限
one_time =datetime.datetime.now() - datetime.timedelta(hours=10)
ress = Meter1.select().where(Meter1.ts > one_time).all()
#限定条件可以使用 > < == >= <= != and or ! 等。字符类型的字段可以使用 % 作为模糊查询(相当于like)
ress = Meter1.select().where(Meter1.cur > 0 or Meter1.desc % 'g%').all()
#where函数可以接收任意多参数,每个参数为一个限定条件,参数条件之间为"与"的关系。
ress = Meter1.select().where(Meter1.cur > 0, Meter1.ts > one_time, Meter1.desc % '%1').all()

分页与limit

#可以在select函数后链式调用paginate函数进行分页操作,以下例子为取第6页 每页5条数据。
ress_1 = Meter1.select().paginate(6,page_size=5).all()
ress_2 = Meter1.select().paginate(6).all() #默认page_size为20
#可以在select函数后链式调用limit函数和offset函数条数限制和定位操作。
ress_3 = Meter1.select().limit(2).offset(5).all()
ress_4 = Meter1.select().limit(2).all()

排序:目前 TDengine 只支持主键排序

#可以在select函数后链式调用desc或者asc函数进行时间轴的正序或者倒序查询
res = Meter1.select().desc().one()

聚合函数:TDengine 提供了许多聚合函数可供使用,可以直接返回聚合结果,极大的提高数据聚合效率。crown 几乎完全兼容 TDengine 全部的聚合函数,只需要调用对应的方法即可使用。

以下是使用的例子。

#count
count = Meter1.select().count() #统计行数
print(count) # 结果:100
count = Meter1.select().count(Meter1.desc) #统计指定列非空行数
print(count) # 结果:90

#avg(sum,stddev,min,max,first,last,last_row,spread使用方法与avg相同)
avg1 = Meter1.select().avg(Meter1.cur,Meter1.curDouble.alias('aa')) #可以同时获取多列,并且可以使用别名
print(avg1.get(Meter1.cur.avg()),avg1.aa) #打印统计结果

#twa 必须配合where函数,且必须选择时间段
twa1 = Meter1.select().where(Meter1.ts > datetime.datetime(2020, 11, 19, 15, 9, 12, 946118),Meter1.ts < datetime.datetime.now()).twa(Meter1.cur,Meter1.curDouble.alias('aa'))
print(twa1.get(Meter1.cur.twa()),avg1.aa) #打印统计结果

#diff
diffs = Meter1.select().diff(Meter1.curInt.alias('aa')) #diff目前只可以聚合一个属性。
for diff1 in diffs:
    print(diff1.aa,diff1.ts) # 时间点数据同时返回

#top(bottom函数使用方式相同)
# top函数需要提供要统计的属性,行数,以及别名
tops = Meter1.select().top(Meter1.cur,3,alias='aa') 
for top1 in tops:
    print(top1.aa,top1.ts) # 时间点数据同时返回
tops = Meter1.select().top(Meter1.cur,3) # 可以不指定别名
for top1 in tops:
    #不指定别名,需用使用get方法获取属性
    print(top1.get(Meter1.cur.top(3))) 

#percentile (apercentile函数使用方式相同) 
#每个属性参数为一个元组(数组),分别定义要统计的属性,P值(P值取值范围0≤P≤100),可选别名。
percentile1 = Meter1.select().percentile((Meter1.cur,1,'aa'),(Meter1.curDouble,2)) 
print(percentile1.aa)
#不指定别名,需用使用get方法获取属性
print(percentile1.get(Meter1.curDouble.percentile(2)))

#leastsquares
#每个属性参数为一个元组(数组),分别定义要统计的属性,start_val(自变量初始值),step_val(自变量的步长值),可选别名。
leastsquares1 = Meter1.select().leastsquares((Meter1.cur,1,1,'aa'),(Meter1.curDouble,2,2)) 
print(leastsquares1.aa) # 结果:{slop:-0.001595, intercept:0.212111}
#不指定别名,需用使用get方法获取属性
print(leastsquares1.get(Meter1.curDouble.leastsquares(2,2)))

注意:当前版本并不支持多表 join 查询,需要多表查询的情况请使用 raw_sql 函数,执行原始 SQL 语句。后期版本会补充 join 功能。

5. 超级表定义

超级表定义与普通表的区别在于继承自 SuperModel 。而且,在 Meta 类中,可以定义标签。超级表与普通表的查询操作使用方式相同,以上介绍的所有方法也可以在超级表类中使用,查询操作时标签字段也可以当作普通字段一样操作。

# 超级表模型类继承自SuperModel类
class Meters(SuperModel):
    cur = FloatField(db_column='c1')
    curInt = IntegerField(db_column='c2')
    curDouble = DoubleField(db_column='c3')
    desc = BinaryField(db_column='des')
    class Meta:
        database = db
        db_table = 'meters'
        # Meta类中定义的Field,为超级表的标签
        location = BinaryField(max_length=30)
        groupid = IntegerField(db_column='gid')

#建立超级表
Meters.create_table(safe=True) 
#删除超级表
Meters.drop_table(safe=True) 
#查看超级表是否存在
Meters.supertable_exists()

6. 从超级表建立子表

对于数据插入操作,就需要从超级表中建立子表。可以使用 Meters 类的 create_son_table 方法创建子表。该方法返回一个子表对应的类对象。该对象可以和以上介绍的普通表类对象(Meter1)一样使用。

#生成字表模型类的同时,自动在数据库中建表。
SonTable_d3 = Meters.create_son_table('d3',location='beijing',groupid=3)

# SonTable_d3的使用方法和继承自Modle类的模型类一样。可以进行插入与查询操作
SonTable_d3.table_exists() 

#子表中插入数据
m = SonTable_d3(cur = 65.8,curInt=10,curDouble=1.1,desc='g1',ts = datetime.datetime.now())
m.save()

上面介绍了 TDengine 的 Python ORM 连接库 crown 的基本安装和使用方法。除了上面介绍的内容, crown 还提供了很多非常实用的功能,比如动态建表、根据表名获取模型类、分组查询等。有兴趣的读者可以前往 GitHub 上查看使用文档。也欢迎在 GitHub 上多提宝贵意见与通报 bug 。笔者将持续维护该项目,努力提供更加丰富的功能和更加完备的文档供大家使用。

结语

TDengine 作为优秀的国产开源软件,拥有优雅的软件设计和出色的性能表现。非常适合在物联网大数据应用场景下作为数据基础平台使用。未来随着物联网行业的蓬勃发展, TDengine 也必将成为物联网大数据基础架构的一部分而备受全世界相关领域从业者的广泛关注。笔者作为一个物联网行业的一名普通开发人员,非常荣幸有机会开发和维护这样一个 TDengine 周边的开源小项目。希望通过这个项目可以让更多的开发人员可以更加方便便捷的使用 TDengine ,提高工作效率。也希望能够起到抛砖引玉的作用,鼓励更多的开发者加入到开源项目开发中来,大家一起来为丰富 TDengine 的周边生态做贡献。


如果你有好的idea想和TDengine一起实现,欢迎成为Contributor Family的一员。点击下方链接,加入我们!

https://www.taosdata.com/contributor/