提交 8afabff7 编写于 作者: S slzhou

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0_udfd

...@@ -491,7 +491,7 @@ typedef struct { ...@@ -491,7 +491,7 @@ typedef struct {
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision; int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
...@@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr { ...@@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr {
SEpSet epSet; SEpSet epSet;
} SQueryNodeAddr; } SQueryNodeAddr;
typedef struct { typedef struct {
SArray* pArray; // Array of SUseDbRsp SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp; } SUseDbBatchRsp;
...@@ -724,7 +723,7 @@ typedef struct { ...@@ -724,7 +723,7 @@ typedef struct {
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
void tFreeSFuncInfo(SFuncInfo *pInfo); void tFreeSFuncInfo(SFuncInfo* pInfo);
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp); void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
typedef struct { typedef struct {
...@@ -1289,14 +1288,14 @@ typedef struct { ...@@ -1289,14 +1288,14 @@ typedef struct {
} SMVCreateStreamRsp, SMSCreateStreamRsp; } SMVCreateStreamRsp, SMSCreateStreamRsp;
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists; int8_t igExists;
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
char* sql; char* sql;
char* ast; char* ast;
char subscribeDbName[TSDB_DB_NAME_LEN]; char subscribeDbName[TSDB_DB_NAME_LEN];
} SCMCreateTopicReq; } SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
......
...@@ -92,7 +92,7 @@ typedef struct SWalReadHead { ...@@ -92,7 +92,7 @@ typedef struct SWalReadHead {
int8_t headVer; int8_t headVer;
int8_t reserved; int8_t reserved;
int16_t msgType; int16_t msgType;
int32_t len; int32_t bodyLen;
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
int64_t version; int64_t version;
......
...@@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { ...@@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
bool create = false; bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
if (pOffsetObj == NULL) { if (pOffsetObj == NULL) {
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset)); pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN); memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true; create = true;
} }
......
...@@ -91,8 +91,10 @@ static SSyncEnv *doSyncEnvStart() { ...@@ -91,8 +91,10 @@ static SSyncEnv *doSyncEnvStart() {
} }
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
taosTmrCleanUp(pSyncEnv->pTimerManager); if (pSyncEnv != NULL) {
taosMemoryFree(pSyncEnv); taosTmrCleanUp(pSyncEnv->pTimerManager);
taosMemoryFree(pSyncEnv);
}
return 0; return 0;
} }
......
...@@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
assert(walReadWithHandle(pWalHandle, index) == 0); assert(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.len); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
assert(pEntry != NULL); assert(pEntry != NULL);
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
...@@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
pEntry->term = pWalHandle->pHead->head.syncMeta.term; pEntry->term = pWalHandle->pHead->head.syncMeta.term;
pEntry->index = index; pEntry->index = index;
assert(pEntry->dataLen == pWalHandle->pHead->head.len); assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!! // need to hold, do not new every time!!
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
...@@ -257,4 +257,4 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) { ...@@ -257,4 +257,4 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore); char* serialized = logStoreSimple2Str(pLogStore);
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
\ No newline at end of file
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
#ifndef _TD_WAL_INT_H_ #ifndef _TD_WAL_INT_H_
#define _TD_WAL_INT_H_ #define _TD_WAL_INT_H_
#include "tcompare.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcompare.h"
#include "wal.h" #include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -41,10 +41,10 @@ typedef struct WalIdxEntry { ...@@ -41,10 +41,10 @@ typedef struct WalIdxEntry {
} SWalIdxEntry; } SWalIdxEntry;
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) { static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
int tlen; int tlen = 0;
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver); tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset); tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
return 0; return tlen;
} }
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) { static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
...@@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) { ...@@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) {
} }
static inline int walValidBodyCksum(SWalHead* pHead) { static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody);
} }
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) { static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
......
...@@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p ...@@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len); *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
if (*ppHead == NULL) { if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len); memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return 0; return 0;
} }
...@@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (pRead->capacity < pRead->pHead->head.len) { if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pRead->pHead = ptr; pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.len; pRead->capacity = pRead->pHead->head.bodyLen;
} }
if (pRead->pHead->head.len != taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.len)) {
if (pRead->pHead->head.bodyLen !=
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
return -1; return -1;
} }
......
...@@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal); int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.len = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
// sync info // sync info
......
...@@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) { ...@@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver); sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr); int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len); ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) { for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
...@@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { ...@@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver); sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr); int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len); ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) { for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
...@@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { ...@@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver); sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr); int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len); ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) { for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
......
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for stb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] != 0 then
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[1][2] != 0 then
if $data[1][2] != $expectmsgcnt then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[0][3] != 0 then
if $data[0][3] != $expectmsgcnt then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
if $data[1][3] != 0 then
if $data[1][3] != $expectmsgcnt then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfCtb then
if $data[1][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfCtb then
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfCtb then
if $data[1][3] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfCtb then
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[1][1] == 0 then
if $data[0][1] != 1 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfNtb then
if $data[1][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfNtb then
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfNtb then
if $data[1][3] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfNtb then
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-1vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$topicNum = 3
#=============================== start consume =============================#
print ================ test consume from stb
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] != 0 then
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[1][2] != 0 then
if $data[1][2] != $expectmsgcnt then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[0][3] != 0 then
if $data[0][3] != $expectmsgcnt then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
if $data[1][3] != 0 then
if $data[1][3] != $expectmsgcnt then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ctb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfCtb then
if $data[1][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfCtb then
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfCtb then
if $data[1][3] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfCtb then
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table for ntb
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[1][1] == 0 then
if $data[0][1] != 1 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfNtb then
if $data[1][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfNtb then
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfNtb then
if $data[1][3] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfNtb then
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 4
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$cdb_index = 0
#=============================== start consume =============================#
print ================ test consume from stb
$loop_cnt = 0
loop_consume_diff_topic_from_stb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] <= 0 then
return -1
endi
if $data[0][2] >= $expectmsgcnt then
return -1
endi
if $data[1][2] <= 0 then
return -1
endi
if $data[1][2] >= $expectmsgcnt then
return -1
endi
if $data[0][3] <= 0 then
return -1
endi
if $data[0][3] >= $expectmsgcnt then
return -1
endi
if $data[1][3] <= 0 then
return -1
endi
if $data[1][3] >= $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
print ================ test consume from ctb
$loop_cnt = 0
loop_consume_diff_topic_from_ctb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ctb_column
$topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfCtb then
if $data[1][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfCtb then
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfCtb then
if $data[1][3] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfCtb then
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
print ================ test consume from ntb
$loop_cnt = 0
loop_consume_diff_topic_from_ntb:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdb_index = $cdb_index + 1
$cdbName = cdb . $cdb_index
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[1][1] == 0 then
if $data[0][1] != 1 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfNtb then
if $data[1][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfNtb then
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfNtb then
if $data[1][3] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfNtb then
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
run tsim/tmq/prepareBasicEnv-4vgrp.sim
#---- global parameters start ----#
$dbName = db
$vgroups = 4
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
$pullDelay = 5
$ifcheckdata = 1
$showMsg = 1
$showRow = 0
sql connect
sql use $dbName
print == create topics from super table
sql create topic topic_stb_column as select ts, c3 from stb
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print == create topics from child table
sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print == create topics from normal table
sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
#sql show topics
#if $rows != 9 then
# return -1
#endi
$keyList = ' . group.id:cgrp1
$keyList = $keyList . '
$topicNum = 3
print ================ test consume from stb
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
$topicList = ' . topic_stb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_stb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfStb * $topicNum
$expectmsgcnt = $totalMsgOfStb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] <= 0 then
return -1
endi
if $data[0][2] >= $expectmsgcnt then
return -1
endi
if $data[1][2] <= 0 then
return -1
endi
if $data[1][2] >= $expectmsgcnt then
return -1
endi
if $data[0][3] <= 0 then
return -1
endi
if $data[0][3] >= $expectmsgcnt then
return -1
endi
if $data[1][3] <= 0 then
return -1
endi
if $data[1][3] >= $expectmsgcnt then
return -1
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ctb
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
$topicList = ' . topic_ctb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ctb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfCtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ctb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[0][1] == 1 then
if $data[1][1] != 0 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfCtb then
if $data[1][2] != $totalMsgOfCtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfCtb then
if $data[0][2] != $totalMsgOfCtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfCtb then
if $data[1][3] != $totalMsgOfCtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfCtb then
if $data[0][3] != $totalMsgOfCtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
#######################################################################################
print ================ test consume from ntb
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
$topicList = ' . topic_ntb_column
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_all
$topicList = $topicList . ,
$topicList = $topicList . topic_ntb_function
$topicList = $topicList . '
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb * $topicNum
$expectmsgcnt = $totalMsgOfNtb
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
$consumerId = 1
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from ntb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
return -1
endi
endi
if $data[1][1] == 0 then
if $data[0][1] != 1 then
return -1
endi
endi
if $data[0][2] != $totalMsgOfNtb then
if $data[1][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][2] != 0 then
return -1
endi
endi
if $data[1][2] != $totalMsgOfNtb then
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[1][2] != 0 then
return -1
endi
endi
if $data[0][3] != $totalMsgOfNtb then
if $data[1][3] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != 0 then
return -1
endi
endi
if $data[1][3] != $totalMsgOfNtb then
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
if $data[1][3] != 0 then
return -1
endi
endi
#------ not need stop consumer, because it exit after pull msg overthan expect msg
#system tsim/tmq/consume.sh -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -371,6 +371,8 @@ void *consumeThreadFunc(void *param) { ...@@ -371,6 +371,8 @@ void *consumeThreadFunc(void *param) {
loop_consume(pInfo); loop_consume(pInfo);
tmq_commit(pInfo->tmq, NULL, 0);
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册