提交 9a1fa11a 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

import taos
from taos.tmq import *
conn = taos.connect()
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
from taos.tmq import TaosConsumer
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer:
for row in msg:
print(row)
......@@ -5,41 +5,17 @@ title: 使用安装包立即开始
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import PkgListV3 from "/components/PkgListV3";
在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用Docker立即体验](../../get-started/docker/)。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装.
## 安装
<Tabs>
<TabItem value="apt-get" label="apt-get">
可以使用 apt-get 工具从官方仓库安装。
**安装包仓库**
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
```
如果安装 Beta 版需要安装包仓库
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
**使用 apt-get 命令安装**
```bash
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
:::info
下载其他组件、最新 Beta 版及之前版本的安装包,请点击[发布历史页面](../../releases)
:::
:::tip
apt-get 方式只适用于 Debian 或 Ubuntu 系统
::::
</TabItem>
<Tabs>
<TabItem label="Deb 安装" value="debinst">
1.[发布历史页面](../../releases) 下载获得 deb 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.deb;
......@@ -64,7 +40,8 @@ sudo rpm -ivh TDengine-server-3.0.0.0-Linux-x64.rpm
<TabItem label="tar.gz 安装" value="tarinst">
1.[发布历史页面](../../releases) 下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
1. 从列表中下载获得 tar.gz 安装包,例如 TDengine-server-3.0.0.0-Linux-x64.tar.gz;
<PkgListV3 type={0}/>
2. 进入到 TDengine-server-3.0.0.0-Linux-x64.tar.gz 安装包所在目录,先解压文件后,进入子目录,执行其中的 install.sh 安装脚本:
```bash
......@@ -79,16 +56,46 @@ sudo ./install.sh
:::info
install.sh 安装脚本在执行过程中,会通过命令行交互界面询问一些配置信息。如果希望采取无交互安装方式,那么可以用 -e no 参数来执行 install.sh 脚本。运行 `./install.sh -h` 指令可以查看所有参数的详细说明信息。
:::
</TabItem>
<TabItem label="Windows 安装" value="windows">
<PkgListV3 type={3}/>
1.[发布历史页面](../../releases) 下载获得 exe 安装程序,例如 TDengine-server-3.0.0.0-Windows-x64.exe;
2. 运行 TDengine-server-3.0.0.0-Windows-x64.exe 来安装 TDengine。
</TabItem>
<TabItem value="apt-get" label="apt-get">
可以使用 apt-get 工具从官方仓库安装。
**安装包仓库**
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
```
如果安装 Beta 版需要安装包仓库
```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
```
**使用 apt-get 命令安装**
```bash
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine
```
:::tip
apt-get 方式只适用于 Debian 或 Ubuntu 系统
::::
</TabItem>
</Tabs>
......
......@@ -88,6 +88,72 @@ void close() throws SQLException;
```
</TabItem>
<TabItem value="Python" label="Python">
```python
class TaosConsumer():
def __init__(self, *topics, **configs)
def __iter__(self)
def __next__(self)
def sync_next(self)
def subscription(self)
def unsubscribe(self)
def close(self)
def __del__(self)
```
</TabItem>
<TabItem label="Go" value="Go">
```go
func NewConsumer(conf *Config) (*Consumer, error)
func (c *Consumer) Close() error
func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error
func (c *Consumer) FreeMessage(message unsafe.Pointer)
func (c *Consumer) Poll(timeout time.Duration) (*Result, error)
func (c *Consumer) Subscribe(topics []string) error
func (c *Consumer) Unsubscribe() error
```
</TabItem>
<TabItem value="C#" label="C#">
```C#
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)
virtual IConsumer Build()
Consumer(ConsumerBuilder builder)
void Subscribe(IEnumerable<string> topics)
void Subscribe(string topic)
ConsumeResult Consume(int millisecondsTimeout)
List<string> Subscription()
void Unsubscribe()
void Commit(ConsumeResult consumerResult)
void Close()
```
</TabItem>
</Tabs>
## 写入数据
......@@ -230,6 +296,105 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
```
</TabItem>
<TabItem value="Python" label="Python">
Python 使用以下配置项创建一个 Consumer 实例。
| 参数名称 | 类型 | 参数说明 | 备注 |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_user` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_pass` | string | 用于创建连接,同 `taos_connect` | |
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
| `client_id` | string | 客户端 ID | 最大长度:192。 |
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
| `timeout` | int | 消费者拉去的超时时间 | |
</TabItem>
<TabItem label="Go" value="Go">
```go
config := tmq.NewConfig()
defer config.Destroy()
err = config.SetGroupID("test")
if err != nil {
panic(err)
}
err = config.SetAutoOffsetReset("earliest")
if err != nil {
panic(err)
}
err = config.SetConnectIP("127.0.0.1")
if err != nil {
panic(err)
}
err = config.SetConnectUser("root")
if err != nil {
panic(err)
}
err = config.SetConnectPass("taosdata")
if err != nil {
panic(err)
}
err = config.SetConnectPort("6030")
if err != nil {
panic(err)
}
err = config.SetMsgWithTableName(true)
if err != nil {
panic(err)
}
err = config.EnableHeartBeat()
if err != nil {
panic(err)
}
err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) {
if result.ErrCode != 0 {
errStr := wrapper.TMQErr2Str(result.ErrCode)
err := errors.NewError(int(result.ErrCode), errStr)
panic(err)
}
})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem value="C#" label="C#">
```C#
using TDengineTMQ;
// 根据需要,设置消费组 (GourpId)、自动提交 (EnableAutoCommit)、
// 自动提交时间间隔 (AutoCommitIntervalMs)、用户名 (TDConnectUser)、密码 (TDConnectPasswd) 等参数
var cfg = new ConsumerConfig
{
EnableAutoCommit = "true"
AutoCommitIntervalMs = "1000"
GourpId = "TDengine-TMQ-C#",
TDConnectUser = "root",
TDConnectPasswd = "taosdata",
AutoOffsetReset = "earliest"
MsgWithTableName = "true",
TDConnectIp = "127.0.0.1",
TDConnectPort = "6030"
};
var consumer = new ConsumerBuilder(cfg).Build();
```
</TabItem>
</Tabs>
上述配置中包括 consumer group ID,如果多个 consumer 指定的 consumer group ID 一样,则自动形成一个 consumer group,共享消费进度。
......@@ -261,6 +426,42 @@ consumer.subscribe(topics);
```
</TabItem>
<TabItem value="Go" label="Go">
```go
consumer, err := tmq.NewConsumer(config)
if err != nil {
panic(err)
}
err = consumer.Subscribe([]string{"example_tmq_topic"})
if err != nil {
panic(err)
}
```
</TabItem>
<TabItem value="C#" label="C#">
```C#
// 创建订阅 topics 列表
List<String> topics = new List<string>();
topics.add("tmq_topic");
// 启动订阅
consumer.Subscribe(topics);
```
</TabItem>
<TabItem value="Python" label="Python">
```python
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
```
</TabItem>
</Tabs>
......@@ -293,9 +494,51 @@ while(running){
}
```
</TabItem>
<TabItem value="Python" label="Python">
```python
for msg in consumer:
for row in msg:
print(row)
```
</TabItem>
<TabItem value="Go" label="Go">
```go
for {
result, err := consumer.Poll(time.Second)
if err != nil {
panic(err)
}
fmt.Println(result)
consumer.Commit(context.Background(), result.Message)
consumer.FreeMessage(result.Message)
}
```
</TabItem>
</Tabs>
<TabItem value="C#" label="C#">
```C#
// 消费数据
while (true)
{
var consumerRes = consumer.Consume(100);
// process ConsumeResult
ProcessMsg(consumerRes);
consumer.Commit(consumerRes);
}
```
</TabItem>
</Tabs>
## 结束消费
消费结束后,应当取消订阅。
......@@ -323,6 +566,32 @@ consumer.close();
```
</TabItem>
<TabItem value="Python" label="Python">
```python
/* 取消订阅 */
consumer.unsubscribe();
/* 关闭消费 */
consumer.close();
<TabItem value="Go" label="Go">
```go
consumer.Close()
```
</TabItem>
<TabItem value="C#" label="C#">
```C#
// 取消订阅
consumer.Unsubscribe();
// 关闭消费
consumer.Close();
</TabItem>
</Tabs>
## 删除 *topic*
......@@ -671,64 +940,14 @@ int main(int argc, char* argv[]) {
```python
import taos
from taos.tmq import *
from taos.tmq import TaosConsumer
conn = taos.connect()
# create database
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
# create table and stables
conn.select_db("py_tmq")
conn.execute("create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
# create topic
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1")
# set consumer configure options
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
conf.set("msg.with.table.name", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
# build consumer
tmq = conf.new_consumer()
# build topic list
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
# subscribe consumer
tmq.subscribe(topic_list)
# check subscriptions
sub_list = tmq.subscription()
print("subscribed topics: ",sub_list)
# start subscribe
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
tb = res.get_table_name()
print(f"from table: {tb}")
import taos
from taos.tmq import *
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer:
for row in msg:
print(row)
```
......
import PkgList from "/components/PkgList";
import PkgListV3 from "/components/PkgListV3";
1. 下载客户端安装包
<PkgList type={1} sys="Linux" />
<PkgListV3 type={1} sys="Linux" />
[所有下载](https://www.taosdata.com/cn/all-downloads/)
[所有下载](../../releases)
2. 解压缩软件包
......
import PkgList from "/components/PkgList";
import PkgListV3 from "/components/PkgListV3";
1. 下载客户端安装包
<PkgList type={1} sys="Windows" />
[所有下载](https://www.taosdata.com/cn/all-downloads/)
<PkgListV3 type={4} sys="Windows" />
[所有下载](../../releases)
2. 执行安装程序,按提示选择默认值,完成安装
3. 安装路径
......
......@@ -4918,6 +4918,16 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
static SSampleInfo* getSampleOutputInfo(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
return pInfo;
}
bool getSampleFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
......@@ -4972,7 +4982,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
......@@ -4998,7 +5008,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
pEntryInfo->complete = true;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册