提交 ba32eab6 编写于 作者: dengyihao's avatar dengyihao

Merge branch '3.0' of https://github.com/taosdata/TDengine into new

...@@ -807,300 +807,9 @@ SHOW SUBSCRIPTIONS; ...@@ -807,300 +807,9 @@ SHOW SUBSCRIPTIONS;
以下是各语言的完整示例代码。 以下是各语言的完整示例代码。
<Tabs defaultValue="java" groupId="lang"> <Tabs defaultValue="java" groupId="lang">
<TabItem label="C" value="c">
```c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static char dbName[64] = "tmqdb";
static char stbName[64] = "stb";
static char topicName[64] = "topicname";
static int32_t msg_process(TAOS_RES* msg) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(msg);
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
int32_t* length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf);
}
return rows;
}
static int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes;
// drop database if exists
printf("create database\n");
pRes = taos_query(pConn, "drop database if exists tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "create database tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create super table
printf("create super table\n");
pRes = taos_query(
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// create sub tables
printf("create sub tables\n");
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
if (taos_errno(pRes) != 0) {
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
// insert data
printf("insert data into sub tables\n");
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int32_t create_topic() {
printf("create topic\n");
TAOS_RES* pRes;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
pRes = taos_query(pConn, "use tmqdb");
if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_res_t code;
tmq_conf_t* conf = tmq_conf_new();
code = tmq_conf_set(conf, "enable.auto.commit", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "group.id", "cgrpName");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "client.id", "user defined name");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.user", "root");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "experimental.snapshot.enable", "true");
if (TMQ_CONF_OK != code) return NULL;
code = tmq_conf_set(conf, "msg.with.table.name", "true");
if (TMQ_CONF_OK != code) return NULL;
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() { <TabItem label="C" value="c">
tmq_list_t* topicList = tmq_list_new(); <CDemo>
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
return NULL;
}
return topicList;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
int32_t code;
if ((code = tmq_subscribe(tmq, topicList))) {
fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
return;
}
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (running) {
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
if (tmqmsg) {
msgCnt++;
totalRows += msg_process(tmqmsg);
taos_free_result(tmqmsg);
/*} else {*/
/*break;*/
}
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
int main(int argc, char* argv[]) {
int32_t code;
if (init_env() < 0) {
return -1;
}
if (create_topic() < 0) {
return -1;
}
tmq_t* tmq = build_consumer();
if (NULL == tmq) {
fprintf(stderr, "%% build_consumer() fail!\n");
return -1;
}
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
return -1;
}
basic_consume_loop(tmq, topic_list);
code = tmq_unsubscribe(tmq);
if (code) {
fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% unsubscribe\n");
}
code = tmq_consumer_close(tmq);
if (code) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
} else {
fprintf(stderr, "%% Consumer closed\n");
}
return 0;
}
```
[查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c)
</TabItem> </TabItem>
<TabItem label="Java" value="java"> <TabItem label="Java" value="java">
...@@ -1116,22 +825,7 @@ int main(int argc, char* argv[]) { ...@@ -1116,22 +825,7 @@ int main(int argc, char* argv[]) {
</TabItem> </TabItem>
<TabItem label="Python" value="Python"> <TabItem label="Python" value="Python">
<Python />
```python
import taos
from taos.tmq import TaosConsumer
import taos
from taos.tmq import *
consumer = TaosConsumer('topic_ctb_column', group_id='vg2')
for msg in consumer:
for row in msg:
print(row)
```
[查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py)
</TabItem> </TabItem>
<TabItem label="Node.JS" value="Node.JS"> <TabItem label="Node.JS" value="Node.JS">
......
```c ```c
{{#include docs/examples/c/subscribe_demo.c}} {{#include docs/examples/c/tmq-example.c}}
``` ```
\ No newline at end of file
```py ```py
{{#include docs/examples/python/subscribe_demo.py}} {{#include docs/examples/python/tmq_example.py}}
``` ```
\ No newline at end of file
...@@ -2555,10 +2555,14 @@ typedef struct { ...@@ -2555,10 +2555,14 @@ typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
int64_t ntbUid; int64_t ntbUid;
SArray* colIdList; // SArray<int16_t> SArray* colIdList; // SArray<int16_t>
} SCheckAlterInfo; } STqCheckInfo;
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo); int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo);
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo); int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
} STqDelCheckInfoReq;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
......
...@@ -188,7 +188,8 @@ enum { ...@@ -188,7 +188,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
......
...@@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
......
...@@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp ...@@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
} }
int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) {
if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1;
if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; if (tEncodeI64(encoder, pOffset->offset) < 0) return -1;
...@@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe ...@@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe
tEndDecode(decoder); tEndDecode(decoder);
return 0; return 0;
} }
int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { ...@@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
return 0; return 0;
} }
#if 1
int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
if (pVal->type == TMQ_OFFSET__RESET_NONE) { if (pVal->type == TMQ_OFFSET__RESET_NONE) {
snprintf(buf, maxLen, "offset(reset to none)"); snprintf(buf, maxLen, "offset(reset to none)");
...@@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { ...@@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} }
return 0; return 0;
} }
#endif
bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
if (pLeft->type == pRight->type) { if (pLeft->type == pRight->type) {
...@@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { ...@@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0; return 0;
} }
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) { int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1; if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
int32_t sz = taosArrayGetSize(pInfo->colIdList); int32_t sz = taosArrayGetSize(pInfo->colIdList);
...@@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) ...@@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo)
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) { int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1; if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
int32_t sz; int32_t sz;
......
...@@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() { ...@@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() { ...@@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndOffset.h" #include "mndOffset.h"
#include "mndPrivilege.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h" #include "mndTopic.h"
...@@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbRelease(pSdb, pOffset); sdbRelease(pSdb, pOffset);
} }
return code; return code;
} }
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
......
...@@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) { ...@@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
...@@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (topicObj.ntbUid != 0) { if (topicObj.ntbUid != 0) {
SCheckAlterInfo info; STqCheckInfo info;
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
info.ntbUid = topicObj.ntbUid; info.ntbUid = topicObj.ntbUid;
info.colIdList = topicObj.ntbColIds; info.colIdList = topicObj.ntbColIds;
...@@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
// encoder check alter info // encoder check alter info
int32_t len; int32_t len;
int32_t code; int32_t code;
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code); tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
if (code < 0) { if (code < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); mndTransDrop(pTrans);
...@@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) { if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf; action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len; action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_CHECK_ALTER_INFO; action.msgType = TDMT_VND_ADD_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf); taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
...@@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 0
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0); ASSERT(0);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
return -1; return -1;
} }
#endif
// TODO check if rebalancing // TODO check if rebalancing
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
...@@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return -1; return -1;
} }
if (pTopic->ntbUid != 0) {
// broadcast to all vnode
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.msgType = TDMT_VND_DELETE_CHECK_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
}
}
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
......
...@@ -117,16 +117,15 @@ typedef struct { ...@@ -117,16 +117,15 @@ typedef struct {
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
SHashObj* pushMgr; // consumerId -> STqHandle* SHashObj* pPushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
TDB* pMetaStore; TDB* pMetaDB;
TTB* pExecStore; TTB* pExecStore;
TTB* pCheckStore;
TTB* pAlterInfoStore;
SStreamMeta* pStreamMeta; SStreamMeta* pStreamMeta;
}; };
...@@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq); ...@@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaRestoreHandle(STQ* pTq); int32_t tqMetaRestoreHandle(STQ* pTq);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
typedef struct { typedef struct {
int32_t size; int32_t size;
......
...@@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); ...@@ -163,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); // tq-mq
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver); int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); // tq-stream
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......
...@@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (tqMetaOpen(pTq) < 0) { if (tqMetaOpen(pTq) < 0) {
ASSERT(0); ASSERT(0);
...@@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
void tqClose(STQ* pTq) { void tqClose(STQ* pTq) {
if (pTq) { if (pTq) {
tqOffsetClose(pTq->pOffsetStore); tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles); taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pPushMgr);
taosHashCleanup(pTq->pAlterInfo); taosHashCleanup(pTq->pCheckInfo);
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta); streamMetaClose(pTq->pStreamMeta);
...@@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ...@@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0; return 0;
} }
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) { static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
}
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
STqOffset offset = {0}; STqOffset offset = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
...@@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve ...@@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
} else if (offset.val.type == TMQ_OFFSET__LOG) { } else if (offset.val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
TD_VID(pTq->pVnode), offset.val.version); TD_VID(pTq->pVnode), offset.val.version);
if (offset.val.version + 1 == version) {
offset.val.version += 1;
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
/*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/ STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
/*if (pOffset != NULL) {*/ if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
/*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/ return 0;
}
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) { if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
ASSERT(0); ASSERT(0);
...@@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve ...@@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
} }
} }
// rsp
/*}*/ /*}*/
/*}*/ /*}*/
...@@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve ...@@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->pAlterInfo, pIter); pIter = taosHashIterate(pTq->pCheckInfo, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter; STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
if (pCheck->ntbUid == tbUid) { if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList); int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) { if (forbidColId == colId) {
taosHashCancelIterate(pTq->pAlterInfo, pIter); taosHashCancelIterate(pTq->pCheckInfo, pIter);
return -1; return -1;
} }
} }
...@@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/ /*ASSERT(pHandle);*/
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
...@@ -478,10 +490,10 @@ OVER: ...@@ -478,10 +490,10 @@ OVER:
return code; return code;
} }
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0); ASSERT(code == 0);
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
...@@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SCheckAlterInfo info = {0}; STqCheckInfo info = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) { if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) { if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
return 0; return 0;
} }
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
...@@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
} }
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO // TODO
...@@ -668,34 +696,9 @@ FAIL: ...@@ -668,34 +696,9 @@ FAIL:
return code; return code;
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
// //
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
#if 0
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
tDecoderClear(&decoder);
if (tqExpandTask(pTq, pTask) < 0) {
goto FAIL;
}
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
return -1;
#endif
} }
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
...@@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
......
...@@ -43,86 +43,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { ...@@ -43,86 +43,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0; return 0;
} }
int32_t tqMetaRestoreHandle(STQ* pTq) { int32_t tqMetaOpen(STQ* pTq) {
TBC* pCur = NULL; if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
void* pKey = NULL; if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
int kLen = 0; ASSERT(0);
void* pVal = NULL; return -1;
int vLen = 0; }
SDecoder decoder;
tdbTbcMoveToFirst(pCur); if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
ASSERT(0);
return -1;
}
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { if (tqMetaRestoreHandle(pTq) < 0) {
STqHandle handle; return -1;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); }
tDecodeSTqHandle(&decoder, &handle);
handle.pRef = walOpenRef(pTq->pVnode->pWal); if (tqMetaRestoreCheckInfo(pTq) < 0) {
if (handle.pRef == NULL) { return -1;
ASSERT(0); }
return -1;
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return 0;
SReadHandle reader = { }
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo( int32_t tqMetaClose(STQ* pTq) {
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); if (pTq->pExecStore) {
ASSERT(handle.execHandle.execCol.task); tdbTbClose(pTq->pExecStore);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle));
} }
if (pTq->pCheckStore) {
tdbTbcClose(pCur); tdbTbClose(pTq->pCheckStore);
}
tdbClose(pTq->pMetaDB);
return 0; return 0;
} }
int32_t tqMetaOpen(STQ* pTq) { int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { TXN txn;
ASSERT(0); if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
return -1; return -1;
} }
if (tqMetaRestoreHandle(pTq) < 0) { if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) {
return -1;
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
return -1; return -1;
} }
return 0; return 0;
} }
int32_t tqMetaClose(STQ* pTq) { int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
if (pTq->pExecStore) { TXN txn;
tdbTbClose(pTq->pExecStore);
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
ASSERT(0);
}
if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) {
/*ASSERT(0);*/
}
if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0);
}
return 0;
}
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tDecoderClear(&decoder);
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
} }
tdbClose(pTq->pMetaStore); tdbTbcClose(pCur);
return 0; return 0;
} }
...@@ -153,7 +183,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -153,7 +183,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0); ASSERT(0);
} }
if (tdbBegin(pTq->pMetaStore, &txn) < 0) { if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -161,7 +191,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ...@@ -161,7 +191,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
ASSERT(0); ASSERT(0);
} }
if (tdbCommit(pTq->pMetaStore, &txn) < 0) { if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -177,7 +207,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ...@@ -177,7 +207,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
ASSERT(0); ASSERT(0);
} }
if (tdbBegin(pTq->pMetaStore, &txn) < 0) { if (tdbBegin(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
...@@ -185,9 +215,67 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ...@@ -185,9 +215,67 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
if (tdbCommit(pTq->pMetaStore, &txn) < 0) { if (tdbCommit(pTq->pMetaDB, &txn) < 0) {
ASSERT(0); ASSERT(0);
} }
return 0; return 0;
} }
int32_t tqMetaRestoreHandle(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
ASSERT(0);
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
return -1;
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}
tdbTbcClose(pCur);
return 0;
}
...@@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { ...@@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->handles, pIter); pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
......
...@@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { ...@@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
if (rollback) { if (rollback) {
ASSERT(0); tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn);
} else { } else {
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
if (code) goto _err; if (code) goto _err;
} }
......
...@@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break; break;
/* TQ */ /* TQ */
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_VND_MQ_VG_DELETE: case TDMT_VND_MQ_VG_DELETE:
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_VND_MQ_COMMIT_OFFSET: case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead), version) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_VND_CHECK_ALTER_INFO: case TDMT_VND_ADD_CHECK_INFO:
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_DELETE_CHECK_INFO:
if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
break; break;
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_DROP: { case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
} break; } break;
......
...@@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
......
...@@ -205,28 +205,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -205,28 +205,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \
SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \
if (cliMsg->type == Release) return; \
} \
tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \
if (T_REF_VAL_GET(conn) > 1) { \
transUnrefCliHandle(conn); \
} \
destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \
transQueueClear(&conn->cliMsgs); \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
return; \
} \
} while (0)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \ do { \
...@@ -358,7 +336,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -358,7 +336,6 @@ void cliHandleResp(SCliConn* conn) {
if (cliRecvReleaseReq(conn, pHead)) { if (cliRecvReleaseReq(conn, pHead)) {
return; return;
} }
CONN_SHOULD_RELEASE(conn, pHead);
if (CONN_NO_PERSIST_BY_APP(conn)) { if (CONN_NO_PERSIST_BY_APP(conn)) {
pMsg = transQueuePop(&conn->cliMsgs); pMsg = transQueuePop(&conn->cliMsgs);
...@@ -1418,7 +1395,7 @@ int transReleaseCliHandle(void* handle) { ...@@ -1418,7 +1395,7 @@ int transReleaseCliHandle(void* handle) {
} }
STransMsg tmsg = {.info.handle = handle}; STransMsg tmsg = {.info.handle = handle};
// TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64());
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg; cmsg->msg = tmsg;
......
...@@ -69,7 +69,7 @@ class TDTestCase: ...@@ -69,7 +69,7 @@ class TDTestCase:
comput_irate_value = origin_result[1][0]*1000/( origin_result[1][-1] - origin_result[0][-1]) comput_irate_value = origin_result[1][0]*1000/( origin_result[1][-1] - origin_result[0][-1])
else: else:
comput_irate_value = (origin_result[1][0] - origin_result[0][0])*1000/( origin_result[1][-1] - origin_result[0][-1]) comput_irate_value = (origin_result[1][0] - origin_result[0][0])*1000/( origin_result[1][-1] - origin_result[0][-1])
if abs(comput_irate_value - irate_value) <= 0.0000001: if abs(comput_irate_value - irate_value) <= 0.001: # set as 0.001 avoid floating point precision calculation errors
tdLog.info(" irate work as expected , sql is %s "% irate_sql) tdLog.info(" irate work as expected , sql is %s "% irate_sql)
else: else:
tdLog.exit(" irate work not as expected , sql is %s "% irate_sql) tdLog.exit(" irate work not as expected , sql is %s "% irate_sql)
......
...@@ -25,13 +25,13 @@ from util.cases import * ...@@ -25,13 +25,13 @@ from util.cases import *
from util.sql import * from util.sql import *
from util.dnodes import * from util.dnodes import *
dbname = 'db'
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
def mavg_query_form(self, sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""): def mavg_query_form(self, sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr=f"{dbname}.t1", condition=""):
''' '''
mavg function: mavg function:
...@@ -50,7 +50,7 @@ class TDTestCase: ...@@ -50,7 +50,7 @@ class TDTestCase:
return f"{sel} {func} {col} {m_comm} {k} {r_comm} {alias} {fr} {table_expr} {condition}" return f"{sel} {func} {col} {m_comm} {k} {r_comm} {alias} {fr} {table_expr} {condition}"
def checkmavg(self,sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""): def checkmavg(self,sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr=f"{dbname}.t1", condition=""):
# print(self.mavg_query_form(sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr, # print(self.mavg_query_form(sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr,
# table_expr=table_expr, condition=condition)) # table_expr=table_expr, condition=condition))
line = sys._getframe().f_back.f_lineno line = sys._getframe().f_back.f_lineno
...@@ -62,7 +62,7 @@ class TDTestCase: ...@@ -62,7 +62,7 @@ class TDTestCase:
table_expr=table_expr, condition=condition table_expr=table_expr, condition=condition
)) ))
sql = "select * from t1" sql = f"select * from {dbname}.t1"
collist = tdSql.getColNameList(sql) collist = tdSql.getColNameList(sql)
if not isinstance(col, str): if not isinstance(col, str):
...@@ -326,9 +326,9 @@ class TDTestCase: ...@@ -326,9 +326,9 @@ class TDTestCase:
self.checkmavg(**case6) self.checkmavg(**case6)
# # case7~8: nested query # # case7~8: nested query
# case7 = {"table_expr": "(select c1 from stb1)"} # case7 = {"table_expr": f"(select c1 from {dbname}.stb1)"}
# self.checkmavg(**case7) # self.checkmavg(**case7)
# case8 = {"table_expr": "(select mavg(c1, 1) c1 from stb1 group by tbname)"} # case8 = {"table_expr": f"(select mavg(c1, 1) c1 from {dbname}.stb1 group by tbname)"}
# self.checkmavg(**case8) # self.checkmavg(**case8)
# case9~10: mix with tbname/ts/tag/col # case9~10: mix with tbname/ts/tag/col
...@@ -362,7 +362,7 @@ class TDTestCase: ...@@ -362,7 +362,7 @@ class TDTestCase:
self.checkmavg(**case17) self.checkmavg(**case17)
# # case18~19: with group by # # case18~19: with group by
# case19 = { # case19 = {
# "table_expr": "stb1", # "table_expr": f"{dbname}.stb1",
# "condition": "partition by tbname" # "condition": "partition by tbname"
# } # }
# self.checkmavg(**case19) # self.checkmavg(**case19)
...@@ -371,14 +371,14 @@ class TDTestCase: ...@@ -371,14 +371,14 @@ class TDTestCase:
# case20 = {"condition": "order by ts"} # case20 = {"condition": "order by ts"}
# self.checkmavg(**case20) # self.checkmavg(**case20)
#case21 = { #case21 = {
# "table_expr": "stb1", # "table_expr": f"{dbname}.stb1",
# "condition": "group by tbname order by tbname" # "condition": "group by tbname order by tbname"
#} #}
#self.checkmavg(**case21) #self.checkmavg(**case21)
# # case22: with union # # case22: with union
# case22 = { # case22 = {
# "condition": "union all select mavg( c1 , 1 ) from t2" # "condition": f"union all select mavg( c1 , 1 ) from {dbname}.t2"
# } # }
# self.checkmavg(**case22) # self.checkmavg(**case22)
...@@ -486,32 +486,33 @@ class TDTestCase: ...@@ -486,32 +486,33 @@ class TDTestCase:
#tdSql.query(" select mavg( c1 , 1 ) + 2 from t1 ") #tdSql.query(" select mavg( c1 , 1 ) + 2 from t1 ")
err41 = {"alias": "+ avg(c1)"} err41 = {"alias": "+ avg(c1)"}
self.checkmavg(**err41) # mix with arithmetic 2 self.checkmavg(**err41) # mix with arithmetic 2
err42 = {"alias": ", c1"} # err42 = {"alias": ", c1"}
self.checkmavg(**err42) # mix with other col # self.checkmavg(**err42) # mix with other col
# err43 = {"table_expr": "stb1"} # err43 = {"table_expr": f"{dbname}.stb1"}
# self.checkmavg(**err43) # select stb directly # self.checkmavg(**err43) # select stb directly
err44 = { # err44 = {
"col": "stb1.c1", # "col": "stb1.c1",
"table_expr": "stb1, stb2", # "table_expr": "stb1, stb2",
"condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts" # "condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts"
} # }
self.checkmavg(**err44) # stb join # self.checkmavg(**err44) # stb join
tdSql.query("select mavg( stb1.c1 , 1 ) from stb1, stb2 where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts;")
err45 = { err45 = {
"condition": "where ts>0 and ts < now interval(1h) fill(next)" "condition": "where ts>0 and ts < now interval(1h) fill(next)"
} }
self.checkmavg(**err45) # interval self.checkmavg(**err45) # interval
err46 = { err46 = {
"table_expr": "t1", "table_expr": f"{dbname}.t1",
"condition": "group by c6" "condition": "group by c6"
} }
self.checkmavg(**err46) # group by normal col self.checkmavg(**err46) # group by normal col
err47 = { err47 = {
"table_expr": "stb1", "table_expr": f"{dbname}.stb1",
"condition": "group by tbname slimit 1 " "condition": "group by tbname slimit 1 "
} }
# self.checkmavg(**err47) # with slimit # self.checkmavg(**err47) # with slimit
err48 = { err48 = {
"table_expr": "stb1", "table_expr": f"{dbname}.stb1",
"condition": "group by tbname slimit 1 soffset 1" "condition": "group by tbname slimit 1 soffset 1"
} }
# self.checkmavg(**err48) # with soffset # self.checkmavg(**err48) # with soffset
...@@ -554,8 +555,8 @@ class TDTestCase: ...@@ -554,8 +555,8 @@ class TDTestCase:
err67 = {"k": 0.999999} err67 = {"k": 0.999999}
self.checkmavg(**err67) # k: left out of [1, 1000] self.checkmavg(**err67) # k: left out of [1, 1000]
err68 = { err68 = {
"table_expr": "stb1", "table_expr": f"{dbname}.stb1",
"condition": "group by tbname order by tbname" # order by tbname not supported "condition": f"group by tbname order by tbname" # order by tbname not supported
} }
self.checkmavg(**err68) self.checkmavg(**err68)
...@@ -565,42 +566,42 @@ class TDTestCase: ...@@ -565,42 +566,42 @@ class TDTestCase:
for i in range(tbnum): for i in range(tbnum):
for j in range(data_row): for j in range(data_row):
tdSql.execute( tdSql.execute(
f"insert into t{i} values (" f"insert into {dbname}.t{i} values ("
f"{basetime + (j+1)*10}, {random.randint(-200, -1)}, {random.uniform(200, -1)}, {basetime + random.randint(-200, -1)}, " f"{basetime + (j+1)*10}, {random.randint(-200, -1)}, {random.uniform(200, -1)}, {basetime + random.randint(-200, -1)}, "
f"'binary_{j}', {random.uniform(-200, -1)}, {random.choice([0,1])}, {random.randint(-200,-1)}, " f"'binary_{j}', {random.uniform(-200, -1)}, {random.choice([0,1])}, {random.randint(-200,-1)}, "
f"{random.randint(-200, -1)}, {random.randint(-127, -1)}, 'nchar_{j}' )" f"{random.randint(-200, -1)}, {random.randint(-127, -1)}, 'nchar_{j}' )"
) )
tdSql.execute( tdSql.execute(
f"insert into t{i} values (" f"insert into {dbname}.t{i} values ("
f"{basetime - (j+1) * 10}, {random.randint(1, 200)}, {random.uniform(1, 200)}, {basetime - random.randint(1, 200)}, " f"{basetime - (j+1) * 10}, {random.randint(1, 200)}, {random.uniform(1, 200)}, {basetime - random.randint(1, 200)}, "
f"'binary_{j}_1', {random.uniform(1, 200)}, {random.choice([0, 1])}, {random.randint(1,200)}, " f"'binary_{j}_1', {random.uniform(1, 200)}, {random.choice([0, 1])}, {random.randint(1,200)}, "
f"{random.randint(1,200)}, {random.randint(1,127)}, 'nchar_{j}_1' )" f"{random.randint(1,200)}, {random.randint(1,127)}, 'nchar_{j}_1' )"
) )
tdSql.execute( tdSql.execute(
f"insert into tt{i} values ( {basetime-(j+1) * 10}, {random.randint(1, 200)} )" f"insert into {dbname}.tt{i} values ( {basetime-(j+1) * 10}, {random.randint(1, 200)} )"
) )
pass pass
def mavg_test_table(self,tbnum: int) -> None : def mavg_test_table(self,tbnum: int) -> None :
tdSql.execute("drop database if exists db") tdSql.execute(f"drop database if exists {dbname}")
tdSql.execute("create database if not exists db keep 3650") tdSql.execute(f"create database if not exists {dbname} keep 3650")
tdSql.execute("use db") tdSql.execute(f"use {dbname}")
tdSql.execute( tdSql.execute(
"create stable db.stb1 (\ f"create stable {dbname}.stb1 (\
ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, \ ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, \
c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16)\ c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16)\
) \ ) \
tags(st1 int)" tags(st1 int)"
) )
tdSql.execute( tdSql.execute(
"create stable db.stb2 (ts timestamp, c1 int) tags(st2 int)" f"create stable {dbname}.stb2 (ts timestamp, c1 int) tags(st2 int)"
) )
for i in range(tbnum): for i in range(tbnum):
tdSql.execute(f"create table t{i} using stb1 tags({i})") tdSql.execute(f"create table {dbname}.t{i} using {dbname}.stb1 tags({i})")
tdSql.execute(f"create table tt{i} using stb2 tags({i})") tdSql.execute(f"create table {dbname}.tt{i} using {dbname}.stb2 tags({i})")
pass pass
...@@ -617,25 +618,25 @@ class TDTestCase: ...@@ -617,25 +618,25 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert only NULL test:") tdLog.printNoPrefix("######## insert only NULL test:")
for i in range(tbnum): for i in range(tbnum):
tdSql.execute(f"insert into t{i}(ts) values ({nowtime - 5})") tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime - 5})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime + 5})") tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime + 5})")
self.mavg_current_query() self.mavg_current_query()
self.mavg_error_query() self.mavg_error_query()
tdLog.printNoPrefix("######## insert data in the range near the max(bigint/double):") tdLog.printNoPrefix("######## insert data in the range near the max(bigint/double):")
# self.mavg_test_table(tbnum) # self.mavg_test_table(tbnum)
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 1) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})") # f"({nowtime - (per_table_rows + 1) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 2) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})") # f"({nowtime - (per_table_rows + 2) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})")
# self.mavg_current_query() # self.mavg_current_query()
# self.mavg_error_query() # self.mavg_error_query()
tdLog.printNoPrefix("######## insert data in the range near the min(bigint/double):") tdLog.printNoPrefix("######## insert data in the range near the min(bigint/double):")
# self.mavg_test_table(tbnum) # self.mavg_test_table(tbnum)
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 1) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {1-2**63})") # f"({nowtime - (per_table_rows + 1) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {1-2**63})")
# tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values "
# f"({nowtime - (per_table_rows + 2) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {512-2**63})") # f"({nowtime - (per_table_rows + 2) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {512-2**63})")
# self.mavg_current_query() # self.mavg_current_query()
# self.mavg_error_query() # self.mavg_error_query()
...@@ -649,9 +650,9 @@ class TDTestCase: ...@@ -649,9 +650,9 @@ class TDTestCase:
tdLog.printNoPrefix("######## insert data mix with NULL test:") tdLog.printNoPrefix("######## insert data mix with NULL test:")
for i in range(tbnum): for i in range(tbnum):
tdSql.execute(f"insert into t{i}(ts) values ({nowtime})") tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime-(per_table_rows+3)*10})") tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime-(per_table_rows+3)*10})")
tdSql.execute(f"insert into t{i}(ts) values ({nowtime+(per_table_rows+3)*10})") tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime+(per_table_rows+3)*10})")
self.mavg_current_query() self.mavg_current_query()
self.mavg_error_query() self.mavg_error_query()
...@@ -664,67 +665,64 @@ class TDTestCase: ...@@ -664,67 +665,64 @@ class TDTestCase:
tdDnodes.start(index) tdDnodes.start(index)
self.mavg_current_query() self.mavg_current_query()
self.mavg_error_query() self.mavg_error_query()
tdSql.query("select mavg(1,1) from t1") tdSql.query(f"select mavg(1,1) from {dbname}.t1")
tdSql.checkRows(7) tdSql.checkRows(7)
tdSql.checkData(0,0,1.000000000) tdSql.checkData(0,0,1.000000000)
tdSql.checkData(1,0,1.000000000) tdSql.checkData(1,0,1.000000000)
tdSql.checkData(5,0,1.000000000) tdSql.checkData(5,0,1.000000000)
tdSql.query("select mavg(abs(c1),1) from t1") tdSql.query(f"select mavg(abs(c1),1) from {dbname}.t1")
tdSql.checkRows(4) tdSql.checkRows(4)
def mavg_support_stable(self): def mavg_support_stable(self):
tdSql.query(" select mavg(1,3) from stb1 ") tdSql.query(f" select mavg(1,3) from {dbname}.stb1 ")
tdSql.checkRows(68) tdSql.checkRows(68)
tdSql.checkData(0,0,1.000000000) tdSql.checkData(0,0,1.000000000)
tdSql.query("select mavg(c1,3) from stb1 partition by tbname ") tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by tbname ")
tdSql.checkRows(20) tdSql.checkRows(20)
# tdSql.query("select mavg(st1,3) from stb1 partition by tbname") tdSql.query(f"select mavg(st1,3) from {dbname}.stb1 partition by tbname")
# tdSql.checkRows(38) tdSql.checkRows(50)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20) tdSql.checkRows(20)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20) tdSql.checkRows(20)
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20) tdSql.checkRows(20)
# # bug need fix
# tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ")
# tdSql.checkRows(2)
# tdSql.error("select mavg(st1+c1,3) from stb1 partition by tbname limit 1 ")
# bug need fix # bug need fix
tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname")
tdSql.checkRows(20) tdSql.checkRows(20)
# bug need fix # bug need fix
# tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname") tdSql.query(f"select tbname , mavg(c1,3) from {dbname}.stb1 partition by tbname")
# tdSql.checkRows(38) tdSql.checkRows(20)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname") tdSql.query(f"select tbname , mavg(st1,3) from {dbname}.stb1 partition by tbname")
# tdSql.checkRows(38) tdSql.checkRows(50)
# tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname slimit 1") tdSql.query(f"select tbname , mavg(st1,3) from {dbname}.stb1 partition by tbname slimit 1")
# tdSql.checkRows(2) tdSql.checkRows(5)
# partition by tags # partition by tags
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1") tdSql.query(f"select st1 , mavg(c1,3) from {dbname}.stb1 partition by st1")
# tdSql.checkRows(38) tdSql.checkRows(20)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1") tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by st1")
# tdSql.checkRows(38) tdSql.checkRows(20)
# tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1 slimit 1") tdSql.query(f"select st1 , mavg(c1,3) from {dbname}.stb1 partition by st1 slimit 1")
# tdSql.checkRows(2) tdSql.checkRows(2)
# tdSql.query("select mavg(c1,3) from stb1 partition by st1 slimit 1") tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by st1 slimit 1")
# tdSql.checkRows(2) tdSql.checkRows(2)
# partition by col # partition by col
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by c1") tdSql.query(f"select c1 , mavg(c1,3) from {dbname}.stb1 partition by c1")
# tdSql.checkRows(38) tdSql.checkRows(0)
# tdSql.query("select mavg(c1 ,3) from stb1 partition by c1") tdSql.query(f"select c1 , mavg(c1,1) from {dbname}.stb1 partition by c1")
# tdSql.checkRows(38) tdSql.checkRows(40)
# tdSql.query("select c1 , mavg(c1,3) from stb1 partition by st1 slimit 1") tdSql.query(f"select c1, c2, c3, c4, mavg(c1,3) from {dbname}.stb1 partition by tbname ")
# tdSql.checkRows(2) tdSql.checkRows(20)
# tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1") tdSql.query(f"select c1, c2, c3, c4, mavg(123,3) from {dbname}.stb1 partition by tbname ")
# tdSql.checkRows(2) tdSql.checkRows(50)
def run(self): def run(self):
import traceback import traceback
......
...@@ -873,7 +873,7 @@ class TDTestCase: ...@@ -873,7 +873,7 @@ class TDTestCase:
# bug need fix # bug need fix
tdSql.query("select c1 ,t1, sample(c1,2) from db.stb1 partition by c1 ") tdSql.query("select c1 ,t1, sample(c1,2) from db.stb1 partition by c1 ")
tdSql.query("select sample(c1,2) from db.stb1 partition by c1 ") tdSql.query("select sample(c1,2) from db.stb1 partition by c1 ")
# tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ") tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ")
def run(self): def run(self):
import traceback import traceback
......
...@@ -113,7 +113,7 @@ python3 ./test.py -f 2-query/hyperloglog.py -R ...@@ -113,7 +113,7 @@ python3 ./test.py -f 2-query/hyperloglog.py -R
python3 ./test.py -f 2-query/interp.py python3 ./test.py -f 2-query/interp.py
python3 ./test.py -f 2-query/interp.py -R python3 ./test.py -f 2-query/interp.py -R
python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/irate.py
# python3 ./test.py -f 2-query/irate.py -R python3 ./test.py -f 2-query/irate.py -R
python3 ./test.py -f 2-query/join.py python3 ./test.py -f 2-query/join.py
python3 ./test.py -f 2-query/join.py -R python3 ./test.py -f 2-query/join.py -R
python3 ./test.py -f 2-query/last_row.py python3 ./test.py -f 2-query/last_row.py
...@@ -169,7 +169,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py ...@@ -169,7 +169,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 2-query/elapsed.py python3 ./test.py -f 2-query/elapsed.py
python3 ./test.py -f 2-query/csum.py python3 ./test.py -f 2-query/csum.py
#python3 ./test.py -f 2-query/mavg.py python3 ./test.py -f 2-query/mavg.py
python3 ./test.py -f 2-query/sample.py python3 ./test.py -f 2-query/sample.py
python3 ./test.py -f 2-query/function_diff.py python3 ./test.py -f 2-query/function_diff.py
python3 ./test.py -f 2-query/unique.py python3 ./test.py -f 2-query/unique.py
...@@ -358,7 +358,7 @@ python3 ./test.py -f 2-query/interp.py -Q 2 ...@@ -358,7 +358,7 @@ python3 ./test.py -f 2-query/interp.py -Q 2
python3 ./test.py -f 2-query/avg.py -Q 2 python3 ./test.py -f 2-query/avg.py -Q 2
# python3 ./test.py -f 2-query/elapsed.py -Q 2 # python3 ./test.py -f 2-query/elapsed.py -Q 2
python3 ./test.py -f 2-query/csum.py -Q 2 python3 ./test.py -f 2-query/csum.py -Q 2
#python3 ./test.py -f 2-query/mavg.py -Q 2 python3 ./test.py -f 2-query/mavg.py -Q 2
python3 ./test.py -f 2-query/sample.py -Q 2 python3 ./test.py -f 2-query/sample.py -Q 2
python3 ./test.py -f 2-query/function_diff.py -Q 2 python3 ./test.py -f 2-query/function_diff.py -Q 2
python3 ./test.py -f 2-query/unique.py -Q 2 python3 ./test.py -f 2-query/unique.py -Q 2
...@@ -445,7 +445,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3 ...@@ -445,7 +445,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3
# python3 ./test.py -f 2-query/avg.py -Q 3 # python3 ./test.py -f 2-query/avg.py -Q 3
# python3 ./test.py -f 2-query/elapsed.py -Q 3 # python3 ./test.py -f 2-query/elapsed.py -Q 3
python3 ./test.py -f 2-query/csum.py -Q 3 python3 ./test.py -f 2-query/csum.py -Q 3
#python3 ./test.py -f 2-query/mavg.py -Q 3 python3 ./test.py -f 2-query/mavg.py -Q 3
python3 ./test.py -f 2-query/sample.py -Q 3 python3 ./test.py -f 2-query/sample.py -Q 3
python3 ./test.py -f 2-query/function_diff.py -Q 3 python3 ./test.py -f 2-query/function_diff.py -Q 3
python3 ./test.py -f 2-query/unique.py -Q 3 python3 ./test.py -f 2-query/unique.py -Q 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册