教程介绍

教程开始前需要掌握的知识

介绍

CnosDB的快速入门教程,通过本课程可以学习CnosDB数据库管理,CnosDB的查询语法,CnosDB的schema原理,与第三方生态插件的结合使用的方法以及CnosDB的最佳实践

数据样本

示例数据(oceanic_station)是中国海洋观测站的公开数据,数据包括在2022年1月14日到4月15日期间,在两个站点XiaoMaiDao和LianYunGang上收集到的海洋观测值,这些数据每3分钟收集一次,总共87360条观测值。 请注意,air、sea、wind中包含虚拟数据,这些数据用于阐述CnosDB中的查询功能。

Docker

Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中,然后发布到任何流行的 Linux或Windows操作系统的机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口。 更多信息请查看Docker官方文档

Golang

一个开源的编程语言,可以构建简单、可靠和高效的软件。 更多信息请查看Golang官方文档

CnosDB入门

摘要

什么是时序数据?

时序数据是指时间序列数据。是按时间顺序记录的数据列,在同一数据列中的各个数据必须是同口径的,要求具有可比性。

场景

天气

CnosDB简介

时序数据库 - 用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据

时序数据管理系统 - 主要通过对时序数据的采集、处理和分析帮助企业实时监控企业的生产与经营过程。

  • 数据是时序的,一定带有时间戳
  • 数据极少有更新操作
  • 数据的写入多,读取少
  • 用户关注的是一段时间的趋势
  • 数据是有保留期限的
  • 除了存储查询外,还需要实时的计算操作
  • 数据量巨大,每天很容易就会过百亿

快速开始

使用Docker启动

docker pull cnosdb/cnosdb:latest
docker run -itd -p 8086:8086 cnosdb/cnosdb:latest

导入示例数据

如何提示bash: wget: command not found

请下载wget工具: apt-get update && apt-get install wget


docker ps # 查看运行中的容器

docker exec -it container_id bash # 进入容器

wget https://gist.githubusercontent.com/cnos-db/9839ac8e78e45b0ee50d2803de4acfd8/raw/818b19d0dd3c80befe636b60ee569451ac2ca4b1/oceanic_station

cnosdb-cli import --path oceanic_station # 导入数据到cnosdb

cnosdb-cli

SHOW DATABASES

USE oceanic_station

CnosQL vs SQL

  • 时间序列数据在聚合场景中最有用

  • CnosDB 中的measurement类似于一个 SQL 中的table

  • CnosDB 中的tag就像 SQL 中的一个带索引的列

  • CnosDB 中的field就像 SQL 中的没有索引的列

  • CnosDBpoints类似于 SQL 中的行

  • CnosDB 中不需要预定义schema

查询入门

查看所有 measurements

show measurements

计算airtemperature的数量

SELECT COUNT("temperature") FROM air

查看air中的前五个值

SELECT * FROM air LIMIT 5

指定字段的标识符号

SELECT "temperature"::field,"station"::tag,"visibility"::field FROM "air" limit 10

查看measurement的tag key

SHOW TAG KEYS FROM air

查看tag value

SHOW TAG VALUES FROM air WITH KEY = "station"

查看field key

SHOW FIELD KEYS FROM air

查看series

SHOW SERIES

函数使用

更多

SELECT MEAN("temperature") FROM "air"

课堂问题

  1. 时序数据和时序数据库的关系是什么?
  2. 写出一条符合CnosDB格式的数据
  3. 查询出2022-01-14T00:00:00Z到2022-02-15T00:00:00Z期间在XiaoMaiDao水位最高的一条数据
  4. 查询出2022-01-14T00:00:00Z到2022-02-15T00:00:00Z期间LianYunGang每天的平均水温是多少

CnosDB 数据库管理

摘要

数据库管理

保留策略管理

数据库管理

CREATE DATABASE

语法

CREATE DATABASE <database_name> [WITH [DURATION <duration>] [REPLICATION <n>] [SHARD DURATION <duration>] [NAME <rp-name>]]

DROP DATABASE

语法

DROP DATABASE <database_name>

DROP SERIES

语法

DROP SERIES FROM <measurement_name[,measurement_name]> WHERE <tag_key>='<tag_value>'

DELETE

语法

DELETE FROM <measurement_name> WHERE [<tag_key>='<tag_value>'] | [<time interval>]

DROP MEASUREMENT

语法

DROP MEASUREMENT <measurement_name>

DROP SHARD

语法

DROP SHARD <shard_id_number>

保留策略管理

CREATE RETENTION POLICY

语法

CREATE RETENTION POLICY <rp_name> ON <database_name> DURATION <duration> REPLICATION <n> [SHARD DURATION <duration>] [DEFAULT]

ALTER RETENTION POLICY

语法

ALTER RETENTION POLICY <rp_name> ON <database_name> DURATION <duration> REPLICATION <n> SHARD DURATION <duration> DEFAULT

DROP RETENTION POLICY

语法

DROP RETENTION POLICY <rp_name> ON <database_name>

数据的增删改查

摘要

  • 理解curl命令
  • 通过Http API Endpoint写入和查询数据

理解curl命令

curl是常用的命令行工具,用来请求Web服务器,它的名字就是客户端(Client)的URL工具的意思

创建数据库

curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"

使用curl通过 CnosDB API 写入数据

CnosDB API是在CnosDB中写入数据的主要方式

写入数据

curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'air,station=XiaoMaiDao visibility=50,temperature=63,pressure=52 1568651760000000000'

以文件的方式写入

curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary @air_data.txt

使用curl查询CnosDB API

CnosDB API 是在 CnosDB中查询数据的主要方式

如果需要执行查询请求,需要将GET请求发送到/query端点,将URL参数db设置为目标数据库,并将参数q设置为查询语句。还可以通过发送相同的参数作为URL参数或作为带有application/x-www-form-urlencoded的正文的一部分来使用POST请求

示例:

单条查询语句

curl -G "http://localhost:8086/query?pretty=true" --data-urlencode "db=oceanic_station" --data-urlencode "q=SELECT temperature FROM air WHERE station='XiaoMaiDao'"

返回错误

CnosDB返回JSON,查询的结果会在rusults数组中,如果发生错误,CnosDB会设置一个带有error的key

curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=mydb" --data-urlencode "q=SELECT temperature FROM air WHERE station='XiaoMaiDao'"

多条查询语句

多条查询语句需要用;分隔

curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=oceanic_station" --data-urlencode "q=SELECT speed FROM wind WHERE station = 'XiaoMaiDao';SELECT temperature FROM air WHERE station = 'XiaoMaiDao'"

时间精度

CnosDB中的所有内容都以UTC存储和输出。默认情况下,时间戳以RFC3339格式返回,例如 2015-08-04T19:05:00Z,如果想要Unix纪元格式的时间戳,则需要在请求中包含字符串参数:epoch=[h, m, s, ms, u, ns]

curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=oceanic_station" --data-urlencode "epoch=s" --data-urlencode "q=SELECT temperature FROM air WHERE station = 'XiaoMaiDao'"

最大行限制

max-row-limit 配置选项允许用于限制返回结果最大数量,以防止CnosDB在聚合结果时耗尽内存,max-row-limit配置选项默认设置为0,该默认设置允许每个请求返回无限数量的行。 最大行限制适用于非块查询,分块查询可以返回无限数量的point

Chunking

通过设置查询字符串参数chunked=true,可以使用分块以流式批处理而不是作为单个响应返回结果。响应将按series或每10000point分块,以先发生者为准。要将最大块大小更改为不同的值,需要将查询字符串chunk_size设置为不同的值

习题练习

使用URL创建一个名为weather的数据库,并且设置Retention Policy为7天,并且Shard Duration为1天,复制因子为1

使用Golang操作CnosDB

摘要

  • 学习使用CnosDB Golang SDK 操作CnosDB

使用/ping查看cnosdb状态

func ExampleClient_Ping() {

	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr: "http://localhost:8086",
	})
	if err != nil {
		fmt.Println("Error creating CnosDB Client: ", err.Error())
	}
	defer c.Close()

	_, rs, err := c.Ping(0)
	fmt.Println("version:", rs)
	if err != nil {
		fmt.Println("Error pinging CnosDB: ", err.Error())
	}
}

使用http client写入一个point

func ExampleClient_write() {
	// Make client
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr: "http://localhost:8086",
	})
	if err != nil {
		fmt.Println("Error creating CnosDB Client: ", err.Error())
	}
	defer c.Close()

	// Create a new point batch
	bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
		Database:  "oceanic_station",
		Precision: "s",
	})

	// Create a point and add to batch
	tags := map[string]string{"station": "XiaoMaiDao"}
	fields := map[string]interface{}{
		"temperature": 67,
		"visibility":  58,
	}
	pt, err := client.NewPoint("air", tags, fields, time.Now())
	if err != nil {
		fmt.Println("Error: ", err.Error())
	}
	bp.AddPoint(pt)

	// Write the batch
	c.Write(bp)
}

创建一个BatchPoint,并添加一个Point

func ExampleBatchPoints() {
	// Create a new point batch
	bp, _ := client.NewBatchPoints(client.BatchPointsConfig{
		Database:  "oceanic_station",
		Precision: "s",
	})

	// Create a point and add to batch
	tags := map[string]string{"station": "XiaoMaiDao"}
	fields := map[string]interface{}{
		"temperature": 67,
		"visibility":  58,
	}
	pt, err := client.NewPoint("air", tags, fields, time.Now())
	if err != nil {
		fmt.Println("Error: ", err.Error())
	}
	bp.AddPoint(pt)
}

使用BatchPoint的setter方法

func ExampleBatchPoints_setters() {
	// Create a new point batch
	bp, _ := client.NewBatchPoints(client.BatchPointsConfig{})
	bp.SetDatabase("oceanic_station")
	bp.SetPrecision("ms")

	// Create a point and add to batch
	tags := map[string]string{"station": "XiaoMaiDao"}
	fields := map[string]interface{}{
		"temperature": 67,
		"visibility":  58,
	}
	pt, err := client.NewPoint("air", tags, fields, time.Now())
	if err != nil {
		fmt.Println("Error: ", err.Error())
	}
	bp.AddPoint(pt)
}

创建一个point并设置时间戳

func ExamplePoint() {
	tags := map[string]string{"station": "XiaoMaiDao"}
	fields := map[string]interface{}{
		"temperature": 67,
		"visibility":  58,
	}
	pt, err := client.NewPoint("air", tags, fields, time.Now())
	if err == nil {
		fmt.Println("We created a point: ", pt.String())
	}
}

创建一个没有时间戳的point

func ExamplePoint_withoutTime() {
	tags := map[string]string{"station": "XiaoMaiDao"}
	fields := map[string]interface{}{
		"temperature": 67,
		"visibility":  58,
	}
	pt, err := client.NewPoint("air", tags, fields)
	if err == nil {
		fmt.Println("We created a point w/o time: ", pt.String())
	}
}

创建一个查询请求

func ExampleClient_query() {
	// Make client
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr: "http://localhost:8086",
	})
	if err != nil {
		fmt.Println("Error creating CnosDB Client: ", err.Error())
	}
	defer c.Close()

	q := client.NewQuery("SELECT temperature FROM air limit 10", "oceanic_station", "ns")
	if response, err := c.Query(q); err == nil && response.Error() == nil {
		fmt.Println(response.Results)
	}
}

CnosQL语法

摘要

学习CnosQL的语法,领略不一样的查询标准

基础语法

配置查询结果

其他

基础语法

SELECT

语法

SELECT <field_key>[,<field_key>,<tag_key>] FROM <measurement_name>[,<measurement_name>]

示例

查询单个measurement中的所有tag和field

select * from air

查询单个measurement中指定的tag和field

select station, pressure from air

查询单个measurement中特定的field并进行基础运算

select (temperature * 2)+3 from air

查询多个measurement中多所有数据

select * from air, sea

查询指定保留策略下的指定measurement中的数据

select * from "oceanic_station"."autogen"."air"

WHERE

语法

SELECT_clause FROM_clause WHERE <conditional_expression> [(AND|OR) <conditional_expression> [...]]

示例

查询满足限制field value条件的数据

select * from air where temperature > 60

WHERE中添加基本运算

select * from air where temperature - 20 > 40

查询限制tag value条件的数据

select * from air where station = 'XiaoMaiDao'

查询既满足tag value也满足field value条件的数据

select station, temperature from air where station = 'XiaoMaiDao' and (temperature > 50 and temperature < 60)
查询满足timestamp条件的数据
select * from air where time > now() - 30d

注意以下查询语句返回的结果 select * from air where station = XiaoMaiDao select * from air where station = "XiaoMaiDao" select * from air where station = 'XiaoMaiDao'

GROUP BY

语法

SELECT_clause FROM_clause [WHERE_clause] GROUP BY [* | <tag_key>[,<tag_key]]

示例

使用tag对结果进行分组

select * from air group by station

使用time internal对结果进行分组

select mean(*) from air group by time(30m)

使用time internal对结果进行分组,添加时间限制条件

select mean(*) from air where time < '2022-04-15T16:00:00Z' group by time(30m)

插值运算

fill([linear | none | null | previous | 任意数值])

示例

select mean(temperature) from air group by time(1w) fill(linear)
select mean(temperature) from air group by time(1w) fill(none)
select mean(temperature) from air group by time(1w) fill(null)
select mean(temperature) from air group by time(1w) fill(previous)
select mean(temperature) from air group by time(1w) fill(50)

INTO

语法

SELECT_clause INTO <measurement_name> FROM_clause [WHERE_clause] [GROUP_BY_clause]

示例

重命名数据库

select * into "copy_oceanic_station"."autogen".:MEASUREMENT from "oceanic_station"."autogen"./.*/ group by *

将查询结果写入measurement

select temperature into Xia
    oMaiDao_air_temperature from air where station = 'XiaoMaiDao'

将聚合结果写入指定measurement

select mean(temperature) into "temperature_mean_1h" from air where station = 'XiaoMaiDao' and time >= '2022-03-01T00:00:00Z' and time <= '2022-04-01T00:00:00Z' group by time(1h)

将多个measurement的聚合结果写入一个不同的数据库

select MEAN(*) into "other_database"."autogen".:MEASUREMENT from /.*/ where time >= '2022-03-01T00:00:00Z' and time <= '2022-04-01T00:06:00Z' group by time(1h)

配置查询结果

ORDER BY time

语法

SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] ORDER BY time DESC

示例

select * from air order by time desc

LIMIT 和 SLIMIT

语法

SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] LIMIT <N>
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] GROUP BY *[,time(<time_interval>)] [ORDER_BY_clause] SLIMIT <N>

示例

限制返回的数据条数

select * from air limit 3

限制返回的series个数

select mean(temperature) from air group by * slimit 1

OFFSET 和 SOFFSET

语法

SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] LIMIT_clause OFFSET <N> [SLIMIT_clause]
SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] GROUP BY *[,time(time_interval)] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] SLIMIT_clause SOFFSET <N>

示例

select * from air limit 1 offset 1
select mean(temperature) from air group by * slimit 1 soffset 1

Time Zone

SELECT_clause [INTO_clause] FROM_clause [WHERE_clause] [GROUP_BY_clause] [ORDER_BY_clause] [LIMIT_clause] [OFFSET_clause] [SLIMIT_clause] [SOFFSET_clause] tz('<time_zone>')
select * from air limit 10 tz('Asia/Shanghai')

其他

查询会合并series

以下在计算temperature的平均值的时候会将两个station合并

select mean(temperature) from air

如果需要只计算其中一个station中的平均值

select mean(temperature) from air ?

如果需要计算其中每一个station中的平均值

select mean(temperature) from air ?

多条语句查询

select * from air limit 5;select * from sea limit 5

子查询

语法

SELECT_clause FROM ( SELECT_statement ) [...]

示例

select sum(max) from (select MAX(temperature) from air group by station)

CnosDB函数

摘要

本次分享CnosDB中函数的使用,揭秘时序数据库都在使用哪些函数进行查询。

聚合函数

选择函数

转换函数

分析函数

预测函数

备注:以下并不是全部的函数,如需查看其他请参考CnosDB函数

聚合函数

COUNT() # 计数
DISTINCT() # 去重
INTEGRAL() # 返回曲线下的面积
MEAN() # 均值
MEDIAN() # 中位数
MODE() # 返回出现频率最高的值
SPREAD() # 返回最大和最小值之间的差
STDDEV() # 求标准差
SUM() # 求和

选择函数

BOTTOM() #返回最小的N个field value
FIRST() #返回最早的时间戳的field value
LAST() #返回最新时间戳的field value
MAX() # 求最大值
MIN() # 求最小值
SAMPLE() # 返回N个field value的随机样本
TOP() # 返回最大的N个field value

转换函数

ABS() 返回field value的绝对值
CUMULATIVE_SUM() # 返回field value的累积总和。
DIFFERENCE() # 返回field value之间的差值。
ELAPSED() # 返回field value的时间戳之间的差值。
POW() # 返回field value的x次方。
ROUND() # 返回指定值的四舍五入后的整数。
SQRT() # 求平方根

分析函数

EXPONENTIAL_MOVING_AVERAGE() #指数加权移动平均值

SELECT EXPONENTIAL_MOVING_AVERAGE(temperature, 2) from air limit 10

预测函数

HOLT_WINTERS() # 返回N个预测的field value。

select holt_winters(mean(temperature),3,3) from air where time > '2022-01-14T16:00:00Z' and time < '2022-01-14T17:00:00Z' group by time(6m)

备份还原和下采样

摘要

本次分享CnosDB的数据备份还原功能和数据下采样

  • 导入导出
  • 备份还原
  • 连续查询

导入导出

导出

Exports TSM files into CnosDB line protocol format.

Usage:
  cnosdb-inspect export [flags]

Examples:
aaa


Flags:
      --compress          Compress the output
      --database string   Optional: the database to export
      --datadir string    Data storage path (default "~/.cnosdb/data")
      --end string        Optional: the end time to export (RFC3339 format)
  -h, --help              help for export
      --out string        Destination file to export to (default "~/.cnosdb/export")
      --rp string         Optional: the retention policy to export (requires -database)
      --start string      Optional: the start time to export (RFC3339 format)
      --waldir string     WAL storage path (default "~/.cnosdb/wal")

示例

cnosdb-inspect export --compress --database oceanic_station  --datadir ~/.cnosdb/data --out oceanic_station.zip

导入

Import a previous database export from file. [Long]

Usage:
  cnosdb-cli import [path] [Use] [flags]

Examples:
Import a previous database export from file. [Example]

Flags:
      --compressed           set to true if the import file is compressed
      --consistency string   Set write consistency level: any, one, quorum, or all. (default "all")
  -h, --help                 help for import
      --host string          Host of the CnosDB instance to connect to. (default "localhost")
  -p, --password string      Password to login to the server. If password is not given, it's the same as using (--password="").
      --path string          Path to the file to import.
      --port int             Port of the CnosDB instance to connect to. (default 8086)
      --pps int              How many points per second the import will allow.  By default it is zero and will not throttle importing.
      --precision string     Precision specifies the format of the timestamp:  rfc3339,h,m,s,ms,u or ns. (default "ns")
      --ssl                  Use https for connecting to cluster.
  -u, --username string      Username to login to the server.
cnosdb-cli import --compressed --consistency all --host localhost --port 8086 --precision s --path oceanic_station.zip

备份还原

备份

➜  ~ cnosdb backup --help
Creates a backup copy of specified CnosDB database(s) and saves the files to PATH (directory where backups are saved).

Usage:
  cnosdb backup [flags] PATH

Examples:
  cnosdb backup --start 2021-10-10T12:12:00Z

Flags:
      --database string
      --db string
      --end string
  -h, --help               help for backup
      --host string         (default "localhost:8088")
      --portable
      --retention string
      --rp string
      --shard string
      --since string
      --skip-errors
      --start string

示例

  1. 备份全部数据
cnosdb backup --portable ./backup
  1. 备份指定数据库
cnosdb backup --portable --db oceanic_station ./backup_oceanic_station
  1. 备份指定时间范围内的数据
cnosdb backup --portable --start 2022-03-28T00:00:00Z --end 2022-03-28T00:00:00Z ./backup_oceanic_station_part

还原

Uses backup copies from the specified PATH to restore databases or specific shards from CnosDB to an CnosDB instance.

Usage:
  cnosdb restore [flags] PATH

Examples:
  cnosdb restore

Flags:
      --database string
      --datadir string
      --db string
  -h, --help               help for restore
      --host string         (default "localhost:8088")
      --metadir string
      --newdb string
      --newrp string
      --online
      --portable
      --retention string
      --rp string
      --shard uint

示例

  1. 还原到临时数据库
cnosdb restore --portable --db oceanic_station --newdb oceanic_station_new --host localhost:8088 ./backup
  1. 将数据写入到原有目标数据库,并删除临时数据库
SELECT * INTO oceanic_station..:MEASUREMENT FROM /.*/ GROUP BY *

DROP DATABASE oceanic_station_new

连续查询

https://www.cnosdb.com/content/cnosdb/latest/cnosql/continuous_queries.html

连续查询

注意:以下将连续查询(Continuous Queries)简称为CQ

语法

基本语法

CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
BEGIN
  <cq_query>
END

语法描述

CQ查询必须包含一个函数,一个INTO子句和一个GROUP BY time()子句:

SELECT <function[s]> INTO <destination_measurement> FROM <measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<tag_key[s]>]

WHERE子句中,不需要指定时间范围,CQ查询会为语句自动匹配时间范围

基本语法示例

以下示例使用数据库transportation中的示例数据,bus_data中存储的数据是公交车乘客数量和投诉数量的15分钟数:

name: air
time                 pressure station     temperature visibility
----                 -------- -------     ----------- ----------
2021-08-31T16:00:00Z 78       LianYunGang 63          71
2021-08-31T16:00:00Z 75       XiaoMaiDao  79          68
2021-08-31T16:03:00Z 50       LianYunGang 52          53
2021-08-31T16:03:00Z 73       XiaoMaiDao  70          55
2021-08-31T16:06:00Z 60       LianYunGang 52          75
2021-08-31T16:06:00Z 58       XiaoMaiDao  77          79
2021-08-31T16:09:00Z 58       LianYunGang 73          65
2021-08-31T16:09:00Z 63       XiaoMaiDao  54          70
2021-08-31T16:12:00Z 50       LianYunGang 73          69
2021-08-31T16:12:00Z 73       XiaoMaiDao  77          63
...

自动采样数据

使用CQ自动从单个字段下采样数据,并将结果写入到同一个数据库的另一个measurement中:

CREATE CONTINUOUS QUERY "cq_basic" ON "oceanic_station"
BEGIN
  SELECT mean("temperature") INTO "average_air_temperatures" FROM "air" GROUP BY time(1h)
END

最终结果如下:

  > SELECT * FROM "average_air_temperatures"
name: average_air_temperatures
time                 mean
----                 ----
2021-08-31T16:00:00Z 63.65
2021-08-31T17:00:00Z 63.3
2021-08-31T18:00:00Z 65.65
2021-08-31T19:00:00Z 61.425
2021-08-31T20:00:00Z 65.775
2021-08-31T21:00:00Z 64.45
2021-08-31T22:00:00Z 65.1
2021-08-31T23:00:00Z 64.95
2021-09-01T00:00:00Z 63.525
2021-09-01T01:00:00Z 66.125
...

自动采样数据并将结果保存到另一个保留策略中

CREATE CONTINUOUS QUERY "cq_basic_rp" ON "oceanic_station"
BEGIN
  SELECT mean("temperature") INTO "oceanic_station"."one_year"."average_air_temperatures_1year" FROM "air" GROUP BY time(1h)
END

最终结果如下:

> SELECT * FROM "oceanic_station"."one_year"."average_air_temperatures_1year"
name: average_air_temperatures_1year
time                 mean
----                 ----
2021-08-31T16:00:00Z 63.65
2021-08-31T17:00:00Z 63.3
2021-08-31T18:00:00Z 65.65
2021-08-31T19:00:00Z 61.425
2021-08-31T20:00:00Z 65.775
...

使用通配符自动下采样数据

CREATE CONTINUOUS QUERY "cq_basic_br" ON "oceanic_station"
BEGIN
  SELECT mean(*) INTO "downsampled_oceanic"."autogen".:MEASUREMENT FROM /.*/ GROUP BY time(168h),*
END

最终结果如下:

> SELECT * FROM "downsampled_oceanic"."autogen"."air"
name: air
time                 mean_pressure     mean_temperature  mean_visibility   station
----                 -------------     ----------------  ---------------   -------
2021-08-26T00:00:00Z 64.5890625        64.4625           64.575            LianYunGang
2021-08-26T00:00:00Z 65.2546875        64.4765625        64.7109375        XiaoMaiDao
2021-09-02T00:00:00Z 65.06517857142858 64.80208333333333 65.0014880952381  LianYunGang
2021-09-02T00:00:00Z 64.86964285714286 64.93988095238095 64.93690476190476 XiaoMaiDao
2021-09-09T00:00:00Z 65.02410714285715 65.13988095238095 65.05684523809524 LianYunGang
2021-09-09T00:00:00Z 65.06607142857143 64.99732142857142 64.91964285714286 XiaoMaiDao
2021-09-16T00:00:00Z 65.13690476190476 64.99464285714286 65.08660714285715 LianYunGang
2021-09-16T00:00:00Z 65.13660714285714 65.09285714285714 64.95446428571428 XiaoMaiDao
2021-09-23T00:00:00Z 64.76636904761905 65.04642857142858 65.01726190476191 LianYunGang
2021-09-23T00:00:00Z 65.03333333333333 64.77708333333334 64.8202380952381  XiaoMaiDao
2021-09-30T00:00:00Z 64.1358024691358  65.01234567901234 64.20987654320987 LianYunGang
2021-09-30T00:00:00Z 64.54320987654322 64.35802469135803 65.06172839506173 XiaoMaiDao

自动采样数据并配置CQ的时间边界

CREATE CONTINUOUS QUERY "cq_basic_offset" ON "oceanic_station"
BEGIN
  SELECT mean("temperature") INTO "average_air_temperatures_offset" FROM "air" GROUP BY time(1h,15m)
END

最终结果如下:

name: average_air_temperatures_offset
time                 mean
----                 ----
2021-08-26T00:03:00Z 64.47581903276131
2021-09-02T00:03:00Z 64.87008928571429
2021-09-09T00:03:00Z 65.06755952380952
2021-09-16T00:03:00Z 65.04315476190476
2021-09-23T00:03:00Z 64.9110119047619
2021-09-30T00:03:00Z 64.775

高级语法

CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
RESAMPLE EVERY <interval> FOR <interval>
BEGIN
  <cq_query>
END

高级语法示例

示例数据如下:

name: sea
time                 station     temperature
----                 -------     -----------
2021-08-31T16:00:00Z LianYunGang 55
2021-08-31T16:00:00Z XiaoMaiDao  50
2021-08-31T16:03:00Z LianYunGang 59
2021-08-31T16:03:00Z XiaoMaiDao  64
2021-08-31T16:06:00Z LianYunGang 71
2021-08-31T16:06:00Z XiaoMaiDao  60
2021-08-31T16:09:00Z LianYunGang 60
2021-08-31T16:09:00Z XiaoMaiDao  62
2021-08-31T16:12:00Z LianYunGang 62
2021-08-31T16:12:00Z XiaoMaiDao  65
...

配置时间间隔

RESAMPLE中使用EVERY来指明CQ的执行间隔

CREATE CONTINUOUS QUERY "cq_advanced_every" ON "oceanic_station"
RESAMPLE EVERY 30m
BEGIN
  SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(1h)
END

最终结果如下:

name: average_sea_temperatures
time                 mean
----                 ----
2021-08-31T16:00:00Z 63.025
2021-08-31T17:00:00Z 63.975
2021-08-31T18:00:00Z 64.45
2021-08-31T19:00:00Z 64.025
2021-08-31T20:00:00Z 64.55
2021-08-31T21:00:00Z 63.075
2021-08-31T22:00:00Z 66.15
2021-08-31T23:00:00Z 64.625
2021-09-01T00:00:00Z 63.025
2021-09-01T01:00:00Z 67.75
...

配置CQ的重采样时间范围

RESAMPLE中使用FOR来指明CQ的时间间隔的长度

CREATE CONTINUOUS QUERY "cq_advanced_for" ON "oceanic_station"
RESAMPLE FOR 1h
BEGIN
  SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(30m)
END

最终结果如下:

name: average_sea_temperatures
time                 mean
----                 ----
2021-08-31T16:00:00Z 62.6
2021-08-31T16:30:00Z 63.45
2021-08-31T17:00:00Z 65.85
2021-08-31T17:30:00Z 62.1
2021-08-31T18:00:00Z 64.45
2021-08-31T18:30:00Z 64.45
2021-08-31T19:00:00Z 64.45
2021-08-31T19:30:00Z 63.6
2021-08-31T20:00:00Z 65.8
2021-08-31T20:30:00Z 63.3
...

配置执行间隔和CQ时间范围

RESAMPLE子句中使用EVERYFOR来指定CQ的执行间隔和CQ的时间范围长度。

CREATE CONTINUOUS QUERY "cq_advanced_every_for" ON "oceanic_station"
RESAMPLE EVERY 1h FOR 90m
BEGIN
  SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(90m)
END

最终结果如下:

name: average_sea_temperatures
time                 mean
----                 ----
2021-08-31T15:00:00Z 62.6
2021-08-31T16:00:00Z 62.6
2021-08-31T16:30:00Z 63.8
2021-08-31T17:00:00Z 65.85
2021-08-31T17:30:00Z 62.1
2021-08-31T18:00:00Z 64.45
2021-08-31T18:30:00Z 64.45
2021-08-31T19:00:00Z 64.45
2021-08-31T19:30:00Z 64.23333333333333
2021-08-31T20:00:00Z 65.8
...

配置CQ的时间范围并填充空值

使用FOR间隔和fill()来更改不含数据的时间间隔值。请注意,至少有一个数据点必须在fill()运行的FOR间隔内。 如果没有数据落在FOR间隔内,则CQ不会将任何数据写入目标measurement

CREATE CONTINUOUS QUERY "cq_advanced_for_fill" ON "oceanic_station"
RESAMPLE FOR 2h
BEGIN
  SELECT mean("temperature") INTO "average_sea_temperatures" FROM "sea" GROUP BY time(1h) fill(1000)
END

最终结果如下:

...
2021-09-30T01:30:00Z 64.35
2021-09-30T02:00:00Z 63.8
2021-09-30T02:30:00Z 64.95
2021-09-30T03:00:00Z 67.225
2021-09-30T03:30:00Z 66.75
2021-09-30T04:00:00Z 54
2021-09-30T05:00:00Z 1000
2021-09-30T06:00:00Z 1000
2021-09-30T07:00:00Z 1000
2021-09-30T08:00:00Z 1000
2021-09-30T09:00:00Z 1000
...

管理CQ

CQ不能update,只能dropcreate

列出所有CQ

SHOW CONTINUOUS QUERIES

删除CQ

DROP CONTINUOUS QUERY <cq_name> ON <database_name>

CnosDB分布式部署

摘要:本次分享内容为往期内容的回顾以及新出炉的分布式版本

  1. 回顾CnosDB基本概念
  2. CnosDB分布式部署

回顾CnosDB基本概念

一些基本概念

series

point

measurement

tag

tag key

tag value

tag set

field

field key

field value

field set

timestamp

retention policy

集群角色

cnosdb

CnosDB的主进程,可以单独启动对外提供服务,也可以与cnosdb-meta公共启动对外提供服务

cnosdb-meta

CnosDB的元数据管理节点,用于协调集群,必需与cnosdb搭配使用,cnosdb-meta中只存储节点信息、节点上的分片信息,数据库,保留策略,订阅信息以及权限和角色 同时cnosdb-meta节点必需为3个及以上的奇数个,因为其使用raft协议来维护节点的一致性,它使用选举机制保证集群一定会存在一个leader节点,奇数个节点能够保证被选举的节点能够得到大多数选票,从而成为leader节点

部署架构

最小集群部署架构请参考一下图片,由3个cnosdb-meta节点和2个cnosdb节点组成 cnosdb节点的数量必需被保留策略中复制因子数量整除,如果保留策略的复制因子数量是2,cnosdb节点的数量必需为2,4,6,..., 如果保留策略的复制因子数量是3,cnosdb节点数量必需为3,6,9

分布式架构图

CnosDB分布式部署

分布式部署图

安装cnosdb-meta,并将它们加入集群

安装cnosdb,并将它们加入集群

CnosDB最佳实践

摘要:我们应该如何高效使用CnosDB

架构设计

  1. tag是用来加速查询的,但是不能参与计算
  2. field用来计算的,不能用来分组
  3. 不建议把tag拼在一起用作measurement
  4. 举例:如果对数值进行分组,可以在为指定范围的数值添加tag
  5. 在一个表当中,什么样的数据需要用来当作tag?
  6. 在一个表当中,什么样的数据需要用来当作field?
  7. 下面的这个表格应该如何设计?
timeregiondeviceidtemperaturehumiditypressurewinspeedwindirrainfall
2022-03-05T12:00:00ZBeijingATAE-075332.556.710251.276.63.5

写入优化

  1. 批量写入
  2. 对数据进行排序

存储设置和下采样

  1. wal和data目录分开存储
  2. 如何正确下采样