教程介绍
教程开始前需要掌握的知识
介绍
CnosDB的快速入门教程,通过本课程可以学习CnosDB数据库管理,CnosDB的查询语法,CnosDB的schema原理,与第三方生态插件的结合使用的方法以及CnosDB的最佳实践
数据样本
示例数据(oceanic_station)是中国海洋观测站的公开数据,数据包括在2022年1月14日到4月15日期间,在两个站点XiaoMaiDao和LianYunGang上收集到的海洋观测值,这些数据每3分钟收集一次,总共87360条观测值。 请注意,air、sea、wind中包含虚拟数据,这些数据用于阐述CnosDB中的查询功能。
Docker
Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中,然后发布到任何流行的 Linux或Windows操作系统的机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。 更多信息请查看Docker官方文档
Golang
一个开源的编程语言,可以构建简单、可靠和高效的软件。 更多信息请查看Golang官方文档
CnosDB入门
摘要
什么是时序数据?
时序数据是指时间序列数据。是按时间顺序记录的数据列,在同一数据列中的各个数据必须是同口径的,要求具有可比性。
CnosDB简介
时序数据库 - 用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据
时序数据管理系统 - 主要通过对时序数据的采集、处理和分析帮助企业实时监控企业的生产与经营过程。
- 数据是时序的,一定带有时间戳
- 数据极少有更新操作
- 数据的写入多,读取少
- 用户关注的是一段时间的趋势
- 数据是有保留期限的
- 除了存储查询外,还需要实时的计算操作
- 数据量巨大,每天很容易就会过百亿
快速开始
使用Docker启动
docker pull cnosdb/cnosdb:latest
docker run -itd -p 8086:8086 cnosdb/cnosdb:latest
导入示例数据
如何提示
bash: wget: command not found
请下载
wget
工具:apt-get update && apt-get install wget
docker ps # 查看运行中的容器
docker exec -it container_id bash # 进入容器
wget https://gist.githubusercontent.com/cnos-db/9839ac8e78e45b0ee50d2803de4acfd8/raw/818b19d0dd3c80befe636b60ee569451ac2ca4b1/oceanic_station
cnosdb-cli import --path oceanic_station # 导入数据到cnosdb
cnosdb-cli
SHOW DATABASES
USE oceanic_station
CnosQL vs SQL
-
时间序列数据在聚合场景中最有用
-
CnosDB 中的
measurement
类似于一个 SQL 中的table
-
CnosDB 中的
tag
就像 SQL 中的一个带索引的列 -
CnosDB 中的
field
就像 SQL 中的没有索引的列 -
CnosDB
points
类似于 SQL 中的行 -
CnosDB 中不需要预定义
schema
查询入门
查看所有 measurements
show measurements
计算air
中temperature
的数量
SELECT COUNT("temperature") FROM air
查看air
中的前五个值
SELECT * FROM air LIMIT 5
指定字段的标识符号
SELECT "temperature"::field,"station"::tag,"visibility"::field FROM "air" limit 10
查看measurement
的tag key
SHOW TAG KEYS FROM air
查看tag value
SHOW TAG VALUES FROM air WITH KEY = "station"
查看field key
SHOW FIELD KEYS FROM air
查看series
SHOW SERIES
函数使用
SELECT MEAN("temperature") FROM "air"
课堂问题
- 时序数据和时序数据库的关系是什么?
- 写出一条符合CnosDB格式的数据
- 查询出2022-01-14T00:00:00Z到2022-02-15T00:00:00Z期间在XiaoMaiDao水位最高的一条数据
- 查询出2022-01-14T00:00:00Z到2022-02-15T00:00:00Z期间LianYunGang每天的平均水温是多少
CnosDB 数据库管理
摘要
数据库管理
保留策略管理
数据库管理
CREATE DATABASE
语法
CREATE DATABASE <database_name> [WITH [DURATION <duration>] [REPLICATION <n>] [SHARD DURATION <duration>] [NAME <rp-name>]]
DROP DATABASE
语法
DROP DATABASE <database_name>
DROP SERIES
语法
DROP SERIES FROM <measurement_name[,measurement_name]> WHERE <tag_key>='<tag_value>'
DELETE
语法
DELETE FROM <measurement_name> WHERE [<tag_key>='<tag_value>'] | [<time interval>]
DROP MEASUREMENT
语法
DROP MEASUREMENT <measurement_name>
DROP SHARD
语法
DROP SHARD <shard_id_number>
保留策略管理
CREATE RETENTION POLICY
语法
CREATE RETENTION POLICY <rp_name> ON <database_name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] [DEFAULT]
ALTER RETENTION POLICY
语法
ALTER RETENTION POLICY <rp_name> ON <database_name> DURATION <duration> REPLICATION <n> SHARD DURATION <duration> DEFAULT
DROP RETENTION POLICY
语法
DROP RETENTION POLICY <rp_name> ON <database_name>
数据的增删改查
摘要
- 理解
curl
命令 - 通过Http API Endpoint写入和查询数据
理解curl
命令
curl是常用的命令行工具,用来请求Web服务器,它的名字就是客户端(Client)的URL工具的意思
创建数据库
curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
使用curl
通过 CnosDB API 写入数据
CnosDB API是在CnosDB中写入数据的主要方式
写入数据
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'air,station=XiaoMaiDao visibility=50,temperature=63,pressure=52 1568651760000000000'
以文件的方式写入
curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary @air_data.txt
使用curl
查询CnosDB API
CnosDB API 是在 CnosDB中查询数据的主要方式
如果需要执行查询请求,需要将GET请求发送到/query端点,将URL参数db设置为目标数据库,并将参数q设置为查询语句。还可以通过发送相同的参数作为URL参数或作为带有application/x-www-form-urlencoded的正文的一部分来使用POST请求
示例:
单条查询语句
curl -G "http://localhost:8086/query?pretty=true" --data-urlencode "db=oceanic_station" --data-urlencode "q=SELECT temperature FROM air WHERE station='XiaoMaiDao'"
返回错误
CnosDB返回JSON,查询的结果会在
rusults
数组中,如果发生错误,CnosDB会设置一个带有error
的key
curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=mydb" --data-urlencode "q=SELECT temperature FROM air WHERE station='XiaoMaiDao'"
多条查询语句
多条查询语句需要用
;
分隔
curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=oceanic_station" --data-urlencode "q=SELECT speed FROM wind WHERE station = 'XiaoMaiDao';SELECT temperature FROM air WHERE station = 'XiaoMaiDao'"
时间精度
CnosDB中的所有内容都以UTC存储和输出。默认情况下,时间戳以RFC3339格式返回,例如 2015-08-04T19:05:00Z,如果想要Unix纪元格式的时间戳,则需要在请求中包含字符串参数:epoch=[h, m, s, ms, u, ns]
curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=oceanic_station" --data-urlencode "epoch=s" --data-urlencode "q=SELECT temperature FROM air WHERE station = 'XiaoMaiDao'"
最大行限制
max-row-limit 配置选项允许用于限制返回结果最大数量,以防止CnosDB在聚合结果时耗尽内存,max-row-limit配置选项默认设置为0,该默认设置允许每个请求返回无限数量的行。 最大行限制适用于非块查询,分块查询可以返回无限数量的point
Chunking
通过设置查询字符串参数chunked=true,可以使用分块以流式批处理而不是作为单个响应返回结果。响应将按series或每10000point分块,以先发生者为准。要将最大块大小更改为不同的值,需要将查询字符串chunk_size设置为不同的值
习题练习
使用URL创建一个名为weather
的数据库,并且设置Retention Policy为7天,并且Shard Duration为1天,复制因子为1
使用Golang操作CnosDB
摘要
- 学习使用CnosDB Golang SDK 操作CnosDB
使用/ping查看cnosdb状态
func ExampleClient_Ping() {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating CnosDB Client: ", err.Error())
}
defer c.Close()
_, rs, err := c.Ping(0)
fmt.Println("version:", rs)
if err != nil {
fmt.Println("Error pinging CnosDB: ", err.Error())
}
}
使用http client写入一个point
func ExampleClient_write() {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating CnosDB Client: ", err.Error())
}
defer c.Close()
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "oceanic_station",
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"station": "XiaoMaiDao"}
fields := map[string]interface{}{
"temperature": 67,
"visibility": 58,
}
pt, err := client.NewPoint("air", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
// Write the batch
c.Write(bp)
}
创建一个BatchPoint,并添加一个Point
func ExampleBatchPoints() {
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
Database: "oceanic_station",
Precision: "s",
})
// Create a point and add to batch
tags := map[string]string{"station": "XiaoMaiDao"}
fields := map[string]interface{}{
"temperature": 67,
"visibility": 58,
}
pt, err := client.NewPoint("air", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
}
使用BatchPoint的setter方法
func ExampleBatchPoints_setters() {
// Create a new point batch
bp, _ := client.NewBatchPoints(client.BatchPointsConfig{})
bp.SetDatabase("oceanic_station")
bp.SetPrecision("ms")
// Create a point and add to batch
tags := map[string]string{"station": "XiaoMaiDao"}
fields := map[string]interface{}{
"temperature": 67,
"visibility": 58,
}
pt, err := client.NewPoint("air", tags, fields, time.Now())
if err != nil {
fmt.Println("Error: ", err.Error())
}
bp.AddPoint(pt)
}
创建一个point并设置时间戳
func ExamplePoint() {
tags := map[string]string{"station": "XiaoMaiDao"}
fields := map[string]interface{}{
"temperature": 67,
"visibility": 58,
}
pt, err := client.NewPoint("air", tags, fields, time.Now())
if err == nil {
fmt.Println("We created a point: ", pt.String())
}
}
创建一个没有时间戳的point
func ExamplePoint_withoutTime() {
tags := map[string]string{"station": "XiaoMaiDao"}
fields := map[string]interface{}{
"temperature": 67,
"visibility": 58,
}
pt, err := client.NewPoint("air", tags, fields)
if err == nil {
fmt.Println("We created a point w/o time: ", pt.String())
}
}
创建一个查询请求
func ExampleClient_query() {
// Make client
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://localhost:8086",
})
if err != nil {
fmt.Println("Error creating CnosDB Client: ", err.Error())
}
defer c.Close()
q := client.NewQuery("SELECT temperature FROM air limit 10", "oceanic_station", "ns")
if response, err := c.Query(q); err == nil && response.Error() == nil {
fmt.Println(response.Results)
}
}
CnosQL语法
摘要
学习CnosQL的语法,领略不一样的查询标准
基础语法
SELECT
语法
SELECT <field_key>[,<field_key>,<tag_key>] FROM <measurement_name>[,<measurement_name>]
示例
查询单个measurement中的所有tag和field
select * from air
查询单个measurement中指定的tag和field
select station, pressure from air
查询单个measurement中特定的field并进行基础运算
select (temperature * 2)+3 from air
查询多个measurement中多所有数据
select * from air, sea
查询指定保留策略下的指定measurement中的数据
select * from "oceanic_station"."autogen"."air"
WHERE
语法
SELECT_clause FROM_clause WHERE <conditional_expression> [(AND|OR) <conditional_expression> [...]]
示例
查询满足限制field value条件的数据
select * from air where temperature > 60
在WHERE
中添加基本运算
select * from air where temperature - 20 > 40
查询限制tag value条件的数据
select * from air where station = 'XiaoMaiDao'
查询既满足tag value也满足field value条件的数据
select station, temperature from air where station = 'XiaoMaiDao' and (temperature > 50 and temperature < 60)
查询满足timestamp条件的数据
select * from air where time > now() - 30d
注意以下查询语句返回的结果 select * from air where station = XiaoMaiDao select * from air where station = "XiaoMaiDao" select * from air where station = 'XiaoMaiDao'
GROUP BY
语法
SELECT_clause FROM_clause [WHERE_clause] GROUP BY [* | <tag_key>[,<tag_key]]
示例
使用tag对结果进行分组
select * from air group by station
使用time internal对结果进行分组
select mean(*) from air group by time(30m)
使用time internal对结果进行分组,添加时间限制条件
select mean(*) from air where time < '2022-04-15T16:00:00Z' group by time(30m)
插值运算
fill([linear | none | null | previous | 任意数值])
示例
select mean(temperature) from air group by time(1w) fill(linear)
select mean(temperature) from air group by time(1w) fill(none)
select mean(temperature) from air group by time(1w) fill(null)
select mean(temperature) from air group by time(1w) fill(previous)
select mean(temperature) from air group by time(1w) fill(50)
INTO
语法
SELECT_clause INTO <measurement_name> FROM_clause [WHERE_clause] [GROUP_BY_clause]
示例
重命名数据库
select * into "copy_oceanic_station"."autogen".:MEASUREMENT from "oceanic_station"."autogen"./.*/ group by *
将查询结果写入measurement
select temperature into Xia
oMaiDao_air_temperature from air where station = 'XiaoMaiDao'
将聚合结果写入指定measurement
select mean(temperature) into "temperature_mean_1h" from air where station = 'XiaoMaiDao' and time >= '2022-03-01T00:00:00Z' and time <= '2022-04-01T00:00:00Z' group by time(1h)
将多个measurement的聚合结果写入一个不同的数据库
select MEAN(*) into "other_database"."autogen".:MEASUREMENT from /.*/ where time >= '2022-03-01T00:00:00Z' and time <= '2022-04-01T00:06:00Z' group by time(1h)
配置查询结果
ORDER BY time
语法
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] ORDER BY time DESC
示例
select * from air order by time desc
LIMIT 和 SLIMIT
语法
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] LIMIT <N>
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] GROUP BY *[,time(<time_interval>)] [ORDER_BY_clause] SLIMIT <N>
示例
限制返回的数据条数
select * from air limit 3
限制返回的series个数
select mean(temperature) from air group by * slimit 1
OFFSET 和 SOFFSET
语法
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] LIMIT_clause OFFSET <N> [SLIMIT_clause]
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] GROUP BY *[,time(time_interval)] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] SLIMIT_clause SOFFSET <N>
示例
select * from air limit 1 offset 1
select mean(temperature) from air group by * slimit 1 soffset 1
Time Zone
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause] tz('<time_zone>')
select * from air limit 10 tz('Asia/Shanghai')
其他
查询会合并series
以下在计算temperature的平均值的时候会将两个station合并
select mean(temperature) from air
如果需要只计算其中一个station中的平均值
select mean(temperature) from air ?
如果需要计算其中每一个station中的平均值
select mean(temperature) from air ?
多条语句查询
select * from air limit 5;select * from sea limit 5
子查询
语法
SELECT_clause FROM ( SELECT_statement ) [...]
示例
select sum(max) from (select MAX(temperature) from air group by station)
CnosDB函数
摘要
本次分享CnosDB中函数的使用,揭秘时序数据库都在使用哪些函数进行查询。
备注:以下并不是全部的函数,如需查看其他请参考CnosDB函数
聚合函数
COUNT() # 计数
DISTINCT() # 去重
INTEGRAL() # 返回曲线下的面积
MEAN() # 均值
MEDIAN() # 中位数
MODE() # 返回出现频率最高的值
SPREAD() # 返回最大和最小值之间的差
STDDEV() # 求标准差
SUM() # 求和
选择函数
BOTTOM() #返回最小的N个field value
FIRST() #返回最早的时间戳的field value
LAST() #返回最新时间戳的field value
MAX() # 求最大值
MIN() # 求最小值
SAMPLE() # 返回N个field value的随机样本
TOP() # 返回最大的N个field value
转换函数
ABS() 返回field value的绝对值
CUMULATIVE_SUM() # 返回field value的累积总和。
DIFFERENCE() # 返回field value之间的差值。
ELAPSED() # 返回field value的时间戳之间的差值。
POW() # 返回field value的x次方。
ROUND() # 返回指定值的四舍五入后的整数。
SQRT() # 求平方根
分析函数
EXPONENTIAL_MOVING_AVERAGE() #指数加权移动平均值
SELECT EXPONENTIAL_MOVING_AVERAGE(temperature, 2) from air limit 10
预测函数
HOLT_WINTERS() # 返回N个预测的field value。
select holt_winters(mean(temperature),3,3) from air where time > '2022-01-14T16:00:00Z' and time < '2022-01-14T17:00:00Z' group by time(6m)
备份还原和下采样
摘要
本次分享CnosDB的数据备份还原功能和数据下采样
- 导入导出
- 备份还原
- 连续查询
导入导出
导出
Exports TSM files into CnosDB line protocol format.
Usage:
cnosdb-inspect export [flags]
Examples:
aaa
Flags:
--compress Compress the output
--database string Optional: the database to export
--datadir string Data storage path (default "~/.cnosdb/data")
--end string Optional: the end time to export (RFC3339 format)
-h, --help help for export
--out string Destination file to export to (default "~/.cnosdb/export")
--rp string Optional: the retention policy to export (requires -database)
--start string Optional: the start time to export (RFC3339 format)
--waldir string WAL storage path (default "~/.cnosdb/wal")
示例
cnosdb-inspect export --compress --database oceanic_station --datadir ~/.cnosdb/data --out oceanic_station.zip
导入
Import a previous database export from file. [Long]
Usage:
cnosdb-cli import [path] [Use] [flags]
Examples:
Import a previous database export from file. [Example]
Flags:
--compressed set to true if the import file is compressed
--consistency string Set write consistency level: any, one, quorum, or all. (default "all")
-h, --help help for import
--host string Host of the CnosDB instance to connect to. (default "localhost")
-p, --password string Password to login to the server. If password is not given, it's the same as using (--password="").
--path string Path to the file to import.
--port int Port of the CnosDB instance to connect to. (default 8086)
--pps int How many points per second the import will allow. By default it is zero and will not throttle importing.
--precision string Precision specifies the format of the timestamp: rfc3339,h,m,s,ms,u or ns. (default "ns")
--ssl Use https for connecting to cluster.
-u, --username string Username to login to the server.
cnosdb-cli import --compressed --consistency all --host localhost --port 8086 --precision s --path oceanic_station.zip
备份还原
备份
➜ ~ cnosdb backup --help
Creates a backup copy of specified CnosDB database(s) and saves the files to PATH (directory where backups are saved).
Usage:
cnosdb backup [flags] PATH
Examples:
cnosdb backup --start 2021-10-10T12:12:00Z
Flags:
--database string
--db string
--end string
-h, --help help for backup
--host string (default "localhost:8088")
--portable
--retention string
--rp string
--shard string
--since string
--skip-errors
--start string
示例
- 备份全部数据
cnosdb backup --portable ./backup
- 备份指定数据库
cnosdb backup --portable --db oceanic_station ./backup_oceanic_station
- 备份指定时间范围内的数据
cnosdb backup --portable --start 2022-03-28T00:00:00Z --end 2022-03-28T00:00:00Z ./backup_oceanic_station_part
还原
Uses backup copies from the specified PATH to restore databases or specific shards from CnosDB to an CnosDB instance.
Usage:
cnosdb restore [flags] PATH
Examples:
cnosdb restore
Flags:
--database string
--datadir string
--db string
-h, --help help for restore
--host string (default "localhost:8088")
--metadir string
--newdb string
--newrp string
--online
--portable
--retention string
--rp string
--shard uint
示例
- 还原到临时数据库
cnosdb restore --portable --db oceanic_station --newdb oceanic_station_new --host localhost:8088 ./backup
- 将数据写入到原有目标数据库,并删除临时数据库
SELECT * INTO oceanic_station..:MEASUREMENT FROM /.*/ GROUP BY *
DROP DATABASE oceanic_station_new
连续查询
https://www.cnosdb.com/content/cnosdb/latest/cnosql/continuous_queries.html
连续查询
注意:以下将连续查询(Continuous Queries)简称为CQ
语法
基本语法
CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
BEGIN
<cq_query>
END
语法描述
CQ查询必须包含一个函数,一个INTO
子句和一个GROUP BY time()
子句:
SELECT <function[s]> INTO <destination_measurement> FROM <measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<tag_key[s]>]
在
WHERE
子句中,不需要指定时间范围,CQ查询会为语句自动匹配时间范围
基本语法示例
以下示例使用数据库transportation
中的示例数据,bus_data
中存储的数据是公交车乘客数量和投诉数量的15分钟数:
name: air
time pressure station temperature visibility
---- -------- ------- ----------- ----------
2021-08-31T16:00:00Z 78 LianYunGang 63 71
2021-08-31T16:00:00Z 75 XiaoMaiDao 79 68
2021-08-31T16:03:00Z 50 LianYunGang 52 53
2021-08-31T16:03:00Z 73 XiaoMaiDao 70 55
2021-08-31T16:06:00Z 60 LianYunGang 52 75
2021-08-31T16:06:00Z 58 XiaoMaiDao 77 79
2021-08-31T16:09:00Z 58 LianYunGang 73 65
2021-08-31T16:09:00Z 63 XiaoMaiDao 54 70
2021-08-31T16:12:00Z 50 LianYunGang 73 69
2021-08-31T16:12:00Z 73 XiaoMaiDao 77 63
...
自动采样数据
使用CQ自动从单个字段下采样数据,并将结果写入到同一个数据库的另一个measurement
中:
CREATE CONTINUOUS QUERY "cq_basic" ON "oceanic_station"
BEGIN
SELECT mean("temperature") INTO "average_air_temperatures" FROM "air" GROUP BY time(1h)
END
最终结果如下:
> SELECT * FROM "average_air_temperatures"
name: average_air_temperatures
time mean
---- ----
2021-08-31T16:00:00Z 63.65
2021-08-31T17:00:00Z 63.3
2021-08-31T18:00:00Z 65.65
2021-08-31T19:00:00Z 61.425
2021-08-31T20:00:00Z 65.775
2021-08-31T21:00:00Z 64.45
2021-08-31T22:00:00Z 65.1
2021-08-31T23:00:00Z 64.95
2021-09-01T00:00:00Z 63.525
2021-09-01T01:00:00Z 66.125
...
自动采样数据并将结果保存到另一个保留策略中
CREATE CONTINUOUS QUERY "cq_basic_rp" ON "oceanic_station"
BEGIN
SELECT mean("temperature") INTO "oceanic_station"."one_year"."average_air_temperatures_1year" FROM "air" GROUP BY time(1h)
END
最终结果如下:
> SELECT * FROM "oceanic_station"."one_year"."average_air_temperatures_1year"
name: average_air_temperatures_1year
time mean
---- ----
2021-08-31T16:00:00Z 63.65
2021-08-31T17:00:00Z 63.3
2021-08-31T18:00:00Z 65.65
2021-08-31T19:00:00Z 61.425
2021-08-31T20:00:00Z 65.775
...
使用通配符自动下采样数据
CREATE CONTINUOUS QUERY "cq_basic_br" ON "oceanic_station"
BEGIN
SELECT mean(*) INTO "downsampled_oceanic"."autogen".:MEASUREMENT FROM /.*/ GROUP BY time(168h),*
END
最终结果如下:
> SELECT * FROM "downsampled_oceanic"."autogen"."air"
name: air
time mean_pressure mean_temperature mean_visibility station
---- ------------- ---------------- --------------- -------
2021-08-26T00:00:00Z 64.5890625 64.4625 64.575 LianYunGang
2021-08-26T00:00:00Z 65.2546875 64.4765625 64.7109375 XiaoMaiDao
2021-09-02T00:00:00Z 65.06517857142858 64.80208333333333 65.0014880952381 LianYunGang
2021-09-02T00:00:00Z 64.86964285714286 64.93988095238095 64.93690476190476 XiaoMaiDao
2021-09-09T00:00:00Z 65.02410714285715 65.13988095238095 65.05684523809524 LianYunGang
2021-09-09T00:00:00Z 65.06607142857143 64.99732142857142 64.91964285714286 XiaoMaiDao
2021-09-16T00:00:00Z 65.13690476190476 64.99464285714286 65.08660714285715 LianYunGang
2021-09-16T00:00:00Z 65.13660714285714 65.09285714285714 64.95446428571428 XiaoMaiDao
2021-09-23T00:00:00Z 64.76636904761905 65.04642857142858 65.01726190476191 LianYunGang
2021-09-23T00:00:00Z 65.03333333333333 64.77708333333334 64.8202380952381 XiaoMaiDao
2021-09-30T00:00:00Z 64.1358024691358 65.01234567901234 64.20987654320987 LianYunGang
2021-09-30T00:00:00Z 64.54320987654322 64.35802469135803 65.06172839506173 XiaoMaiDao
自动采样数据并配置CQ的时间边界
CREATE CONTINUOUS QUERY "cq_basic_offset" ON "oceanic_station"
BEGIN
SELECT mean("temperature") INTO "average_air_temperatures_offset" FROM "air" GROUP BY time(1h,15m)
END
最终结果如下:
name: average_air_temperatures_offset
time mean
---- ----
2021-08-26T00:03:00Z 64.47581903276131
2021-09-02T00:03:00Z 64.87008928571429
2021-09-09T00:03:00Z 65.06755952380952
2021-09-16T00:03:00Z 65.04315476190476
2021-09-23T00:03:00Z 64.9110119047619
2021-09-30T00:03:00Z 64.775
高级语法
CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
RESAMPLE EVERY <interval> FOR <interval>
BEGIN
<cq_query>
END
高级语法示例
示例数据如下:
name: sea
time station temperature
---- ------- -----------
2021-08-31T16:00:00Z LianYunGang 55
2021-08-31T16:00:00Z XiaoMaiDao 50
2021-08-31T16:03:00Z LianYunGang 59
2021-08-31T16:03:00Z XiaoMaiDao 64
2021-08-31T16:06:00Z LianYunGang 71
2021-08-31T16:06:00Z XiaoMaiDao 60
2021-08-31T16:09:00Z LianYunGang 60
2021-08-31T16:09:00Z XiaoMaiDao 62
2021-08-31T16:12:00Z LianYunGang 62
2021-08-31T16:12:00Z XiaoMaiDao 65
...
配置时间间隔
在RESAMPLE
中使用EVERY
来指明CQ的执行间隔
CREATE CONTINUOUS QUERY "cq_advanced_every" ON "oceanic_station"
RESAMPLE EVERY 30m
BEGIN
SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(1h)
END
最终结果如下:
name: average_sea_temperatures
time mean
---- ----
2021-08-31T16:00:00Z 63.025
2021-08-31T17:00:00Z 63.975
2021-08-31T18:00:00Z 64.45
2021-08-31T19:00:00Z 64.025
2021-08-31T20:00:00Z 64.55
2021-08-31T21:00:00Z 63.075
2021-08-31T22:00:00Z 66.15
2021-08-31T23:00:00Z 64.625
2021-09-01T00:00:00Z 63.025
2021-09-01T01:00:00Z 67.75
...
配置CQ的重采样时间范围
在RESAMPLE
中使用FOR
来指明CQ的时间间隔的长度
CREATE CONTINUOUS QUERY "cq_advanced_for" ON "oceanic_station"
RESAMPLE FOR 1h
BEGIN
SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(30m)
END
最终结果如下:
name: average_sea_temperatures
time mean
---- ----
2021-08-31T16:00:00Z 62.6
2021-08-31T16:30:00Z 63.45
2021-08-31T17:00:00Z 65.85
2021-08-31T17:30:00Z 62.1
2021-08-31T18:00:00Z 64.45
2021-08-31T18:30:00Z 64.45
2021-08-31T19:00:00Z 64.45
2021-08-31T19:30:00Z 63.6
2021-08-31T20:00:00Z 65.8
2021-08-31T20:30:00Z 63.3
...
配置执行间隔和CQ时间范围
在RESAMPLE
子句中使用EVERY
和FOR
来指定CQ的执行间隔和CQ的时间范围长度。
CREATE CONTINUOUS QUERY "cq_advanced_every_for" ON "oceanic_station"
RESAMPLE EVERY 1h FOR 90m
BEGIN
SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(90m)
END
最终结果如下:
name: average_sea_temperatures
time mean
---- ----
2021-08-31T15:00:00Z 62.6
2021-08-31T16:00:00Z 62.6
2021-08-31T16:30:00Z 63.8
2021-08-31T17:00:00Z 65.85
2021-08-31T17:30:00Z 62.1
2021-08-31T18:00:00Z 64.45
2021-08-31T18:30:00Z 64.45
2021-08-31T19:00:00Z 64.45
2021-08-31T19:30:00Z 64.23333333333333
2021-08-31T20:00:00Z 65.8
...
配置CQ的时间范围并填充空值
使用FOR
间隔和fill()
来更改不含数据的时间间隔值。请注意,至少有一个数据点必须在fill()
运行的FOR
间隔内。 如果没有数据落在FOR
间隔内,则CQ不会将任何数据写入目标measurement
。
CREATE CONTINUOUS QUERY "cq_advanced_for_fill" ON "oceanic_station"
RESAMPLE FOR 2h
BEGIN
SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(1h) fill(1000)
END
最终结果如下:
...
2021-09-30T01:30:00Z 64.35
2021-09-30T02:00:00Z 63.8
2021-09-30T02:30:00Z 64.95
2021-09-30T03:00:00Z 67.225
2021-09-30T03:30:00Z 66.75
2021-09-30T04:00:00Z 54
2021-09-30T05:00:00Z 1000
2021-09-30T06:00:00Z 1000
2021-09-30T07:00:00Z 1000
2021-09-30T08:00:00Z 1000
2021-09-30T09:00:00Z 1000
...
管理CQ
CQ不能
update
,只能drop
和create
列出所有CQ
SHOW CONTINUOUS QUERIES
删除CQ
DROP CONTINUOUS QUERY <cq_name> ON <database_name>
CnosDB分布式部署
摘要:本次分享内容为往期内容的回顾以及新出炉的分布式版本
- 回顾CnosDB基本概念
- CnosDB分布式部署
回顾CnosDB基本概念
一些基本概念
series
point
measurement
tag
tag key
tag value
tag set
field
field key
field value
field set
timestamp
retention policy
集群角色
cnosdb
CnosDB的主进程,可以单独启动对外提供服务,也可以与cnosdb-meta公共启动对外提供服务
cnosdb-meta
CnosDB的元数据管理节点,用于协调集群,必需与cnosdb搭配使用,cnosdb-meta中只存储节点信息、节点上的分片信息,数据库,保留策略,订阅信息以及权限和角色 同时cnosdb-meta节点必需为3个及以上的奇数个,因为其使用raft协议来维护节点的一致性,它使用选举机制保证集群一定会存在一个leader节点,奇数个节点能够保证被选举的节点能够得到大多数选票,从而成为leader节点
部署架构
最小集群部署架构请参考一下图片,由3个cnosdb-meta节点和2个cnosdb节点组成 cnosdb节点的数量必需被保留策略中复制因子数量整除,如果保留策略的复制因子数量是2,cnosdb节点的数量必需为2,4,6,..., 如果保留策略的复制因子数量是3,cnosdb节点数量必需为3,6,9
CnosDB分布式部署
安装cnosdb-meta,并将它们加入集群
安装cnosdb,并将它们加入集群
CnosDB最佳实践
摘要:我们应该如何高效使用CnosDB
架构设计
- tag是用来加速查询的,但是不能参与计算
- field用来计算的,不能用来分组
- 不建议把tag拼在一起用作measurement
- 举例:如果对数值进行分组,可以在为指定范围的数值添加tag
- 在一个表当中,什么样的数据需要用来当作tag?
- 在一个表当中,什么样的数据需要用来当作field?
- 下面的这个表格应该如何设计?
time | region | deviceid | temperature | humidity | pressure | winspeed | windir | rainfall |
---|---|---|---|---|---|---|---|---|
2022-03-05T12:00:00Z | Beijing | ATAE-0753 | 32.5 | 56.7 | 1025 | 1.2 | 76.6 | 3.5 |
写入优化
- 批量写入
- 对数据进行排序
存储设置和下采样
- wal和data目录分开存储
- 如何正确下采样