diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c279836b006fa029033ec0279691f056d51e140d..757fa8e74b9823dedf24ae1773f0459568c87325 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -491,7 +491,7 @@ typedef struct { char intervalUnit; char slidingUnit; char - offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; @@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr { SEpSet epSet; } SQueryNodeAddr; - typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; @@ -724,7 +723,7 @@ typedef struct { int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); -void tFreeSFuncInfo(SFuncInfo *pInfo); +void tFreeSFuncInfo(SFuncInfo* pInfo); void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp); typedef struct { @@ -1289,14 +1288,14 @@ typedef struct { } SMVCreateStreamRsp, SMSCreateStreamRsp; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic - int8_t igExists; - int8_t withTbName; - int8_t withSchema; - int8_t withTag; - char* sql; - char* ast; - char subscribeDbName[TSDB_DB_NAME_LEN]; + char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic + int8_t igExists; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + char* sql; + char* ast; + char subscribeDbName[TSDB_DB_NAME_LEN]; } SCMCreateTopicReq; int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 10d01f12ec7db4fb9d1b13cc97fabed05129f772..d41e797536a9c83ca2678e44eb7631b08814d67b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -92,7 +92,7 @@ typedef struct SWalReadHead { int8_t headVer; int8_t reserved; int16_t msgType; - int32_t len; + int32_t bodyLen; int64_t ingestTs; // not implemented int64_t version; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index f5433e8f9e06a30a14eb5fe4a867f28eadd38f05..f07534870497c2d8221be2ef784c3523e5334d52 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { bool create = false; SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); if (pOffsetObj == NULL) { - pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset)); + pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj)); memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN); create = true; } diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 0e04793a8339dad44bd9caf7db2e659f7ea10b83..8f7a717e00dc2ec4546445b8077da0e3dc75f531 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -91,8 +91,10 @@ static SSyncEnv *doSyncEnvStart() { } static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { - taosTmrCleanUp(pSyncEnv->pTimerManager); - taosMemoryFree(pSyncEnv); + if (pSyncEnv != NULL) { + taosTmrCleanUp(pSyncEnv->pTimerManager); + taosMemoryFree(pSyncEnv); + } return 0; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index dc903ad7b232480070587d1847d9357791678463..ae153251c3c004633c136bb288f401b72c13ad07 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); assert(walReadWithHandle(pWalHandle, index) == 0); - SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.len); + SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); assert(pEntry != NULL); pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; @@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; pEntry->term = pWalHandle->pHead->head.syncMeta.term; pEntry->index = index; - assert(pEntry->dataLen == pWalHandle->pHead->head.len); - memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); + assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); + memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); // need to hold, do not new every time!! walCloseReadHandle(pWalHandle); @@ -257,4 +257,4 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) { char* serialized = logStoreSimple2Str(pLogStore); sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); -} \ No newline at end of file +} diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index ca0c7561412f9614530e85129f0604f5c3da5db7..84fe2814ffa9377830bf5e2f02e8309aeda9f505 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -16,10 +16,10 @@ #ifndef _TD_WAL_INT_H_ #define _TD_WAL_INT_H_ -#include "tcompare.h" #include "taoserror.h" #include "tchecksum.h" #include "tcoding.h" +#include "tcompare.h" #include "wal.h" #ifdef __cplusplus @@ -41,10 +41,10 @@ typedef struct WalIdxEntry { } SWalIdxEntry; static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) { - int tlen; + int tlen = 0; tlen += taosEncodeFixedI64(buf, pIdxEntry->ver); tlen += taosEncodeFixedI64(buf, pIdxEntry->offset); - return 0; + return tlen; } static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) { @@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) { } static inline int walValidBodyCksum(SWalHead* pHead) { - return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); + return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody); } static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 2545eeab4bbff3e9974dac705b947201a0999b07..6d598699c5eab80e64fa7582fc7fac3de3bd4d25 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p taosThreadMutexUnlock(&pRead->mutex); return -1; } - *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len); + *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen); if (*ppHead == NULL) { taosThreadMutexUnlock(&pRead->mutex); return -1; } - memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len); + memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen); taosThreadMutexUnlock(&pRead->mutex); return 0; } @@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } - if (pRead->capacity < pRead->pHead->head.len) { - void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); + if (pRead->capacity < pRead->pHead->head.bodyLen) { + void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } pRead->pHead = ptr; - pRead->capacity = pRead->pHead->head.len; + pRead->capacity = pRead->pHead->head.bodyLen; } - if (pRead->pHead->head.len != taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.len)) { + + if (pRead->pHead->head.bodyLen != + taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) { return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 81f2d82ea5515eea9bb57e6dd3d43d8c131ce86f..530930c2610c9f57865c7493f4a4bc4153221ecd 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog pWal->writeHead.head.version = index; int64_t offset = walGetCurFileOffset(pWal); - pWal->writeHead.head.len = bodyLen; + pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.msgType = msgType; // sync info diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index b1de3a1dcee592d5e69dd40504008f9a100647bc..18345699b2f5fa55d8356cab7808a09837f61a2e 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) { char newStr[100]; sprintf(newStr, "%s-%d", ranStr, ver); int len = strlen(newStr); - ASSERT_EQ(pRead->pHead->head.len, len); + ASSERT_EQ(pRead->pHead->head.bodyLen, len); for (int j = 0; j < len; j++) { EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); } @@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { char newStr[100]; sprintf(newStr, "%s-%d", ranStr, ver); int len = strlen(newStr); - ASSERT_EQ(pRead->pHead->head.len, len); + ASSERT_EQ(pRead->pHead->head.bodyLen, len); for (int j = 0; j < len; j++) { EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); } @@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { char newStr[100]; sprintf(newStr, "%s-%d", ranStr, ver); int len = strlen(newStr); - ASSERT_EQ(pRead->pHead->head.len, len); + ASSERT_EQ(pRead->pHead->head.bodyLen, len); for (int j = 0; j < len; j++) { EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); } diff --git a/tests/script/tsim/tmq/basic1Of2Cons.sim b/tests/script/tsim/tmq/basic1Of2Cons.sim new file mode 100644 index 0000000000000000000000000000000000000000..5f54715b369eafe007d957a1a4268d4009e648ed --- /dev/null +++ b/tests/script/tsim/tmq/basic1Of2Cons.sim @@ -0,0 +1,394 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics +#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-1vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 5 +$ifcheckdata = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . ' + +$cdb_index = 0 +#=============================== start consume =============================# + +print ================ test consume from stb +$loop_cnt = 0 +loop_consume_diff_topic_from_stb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for stb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_stb_column + $topicList = ' . topic_stb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_stb_all + $topicList = ' . topic_stb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_stb_function + $topicList = ' . topic_stb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_stb_end +endi + +$consumerId = 0 +$totalMsgOfStb = $ctbNum * $rowsPerCtb +$expectmsgcnt = $totalMsgOfStb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] != 0 then + if $data[0][2] != $expectmsgcnt then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi +if $data[1][2] != 0 then + if $data[1][2] != $expectmsgcnt then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != 0 then + if $data[0][3] != $expectmsgcnt then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi +if $data[1][3] != 0 then + if $data[1][3] != $expectmsgcnt then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_stb +loop_consume_diff_topic_from_stb_end: + +print ================ test consume from ctb +$loop_cnt = 0 +loop_consume_diff_topic_from_ctb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ctb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_ctb_column + $topicList = ' . topic_ctb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ctb_all + $topicList = ' . topic_ctb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ctb_function + $topicList = ' . topic_ctb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ctb_end +endi + +$consumerId = 0 +$totalMsgOfCtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfCtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfCtb then + if $data[1][2] != $totalMsgOfCtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfCtb then + if $data[0][2] != $totalMsgOfCtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfCtb then + if $data[1][3] != $totalMsgOfCtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfCtb then + if $data[0][3] != $totalMsgOfCtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi + +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ctb +loop_consume_diff_topic_from_ctb_end: + +print ================ test consume from ntb +$loop_cnt = 0 +loop_consume_diff_topic_from_ntb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ntb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_ntb_column + $topicList = ' . topic_ntb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ntb_all + $topicList = ' . topic_ntb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ntb_function + $topicList = ' . topic_ntb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ntb_end +endi + +$consumerId = 0 +$totalMsgOfNtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[1][1] == 0 then + if $data[0][1] != 1 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfNtb then + if $data[1][2] != $totalMsgOfNtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfNtb then + if $data[0][2] != $totalMsgOfNtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfNtb then + if $data[1][3] != $totalMsgOfNtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfNtb then + if $data[0][3] != $totalMsgOfNtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ntb +loop_consume_diff_topic_from_ntb_end: + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/basic2Of2Cons.sim b/tests/script/tsim/tmq/basic2Of2Cons.sim new file mode 100644 index 0000000000000000000000000000000000000000..354260fb4756333d7b6657475e55b65d322ffc48 --- /dev/null +++ b/tests/script/tsim/tmq/basic2Of2Cons.sim @@ -0,0 +1,333 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics +#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-1vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 1 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 5 +$ifcheckdata = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . ' + +$topicNum = 3 + +#=============================== start consume =============================# + + +print ================ test consume from stb +print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function +$topicList = ' . topic_stb_column +$topicList = $topicList . , +$topicList = $topicList . topic_stb_all +$topicList = $topicList . , +$topicList = $topicList . topic_stb_function +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfStb = $ctbNum * $rowsPerCtb +$totalMsgOfStb = $totalMsgOfStb * $topicNum +$expectmsgcnt = $totalMsgOfStb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] != 0 then + if $data[0][2] != $expectmsgcnt then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi +if $data[1][2] != 0 then + if $data[1][2] != $expectmsgcnt then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != 0 then + if $data[0][3] != $expectmsgcnt then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi +if $data[1][3] != 0 then + if $data[1][3] != $expectmsgcnt then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb1 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ctb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ctb +print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function +$topicList = ' . topic_ctb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_all +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_function +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfCtb = $rowsPerCtb * $topicNum +$expectmsgcnt = $totalMsgOfCtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfCtb then + if $data[1][2] != $totalMsgOfCtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfCtb then + if $data[0][2] != $totalMsgOfCtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfCtb then + if $data[1][3] != $totalMsgOfCtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfCtb then + if $data[0][3] != $totalMsgOfCtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb2 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table for ntb +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ntb +print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function +$topicList = ' . topic_ntb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_all +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_function +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfNtb = $rowsPerCtb * $topicNum +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[1][1] == 0 then + if $data[0][1] != 1 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfNtb then + if $data[1][2] != $totalMsgOfNtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfNtb then + if $data[0][2] != $totalMsgOfNtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfNtb then + if $data[1][3] != $totalMsgOfNtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfNtb then + if $data[0][3] != $totalMsgOfNtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/basic3Of2Cons.sim b/tests/script/tsim/tmq/basic3Of2Cons.sim new file mode 100644 index 0000000000000000000000000000000000000000..ba3b54e43b39918dee3ae390664b04f9a7857604 --- /dev/null +++ b/tests/script/tsim/tmq/basic3Of2Cons.sim @@ -0,0 +1,382 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics +#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-4vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 4 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 5 +$ifcheckdata = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . ' + +$cdb_index = 0 +#=============================== start consume =============================# + +print ================ test consume from stb +$loop_cnt = 0 +loop_consume_diff_topic_from_stb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_stb_column + $topicList = ' . topic_stb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_stb_all + $topicList = ' . topic_stb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_stb_function + $topicList = ' . topic_stb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_stb_end +endi + +$consumerId = 0 +$totalMsgOfStb = $ctbNum * $rowsPerCtb +$expectmsgcnt = $totalMsgOfStb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] <= 0 then + return -1 +endi +if $data[0][2] >= $expectmsgcnt then + return -1 +endi + +if $data[1][2] <= 0 then + return -1 +endi +if $data[1][2] >= $expectmsgcnt then + return -1 +endi + +if $data[0][3] <= 0 then + return -1 +endi +if $data[0][3] >= $expectmsgcnt then + return -1 +endi + +if $data[1][3] <= 0 then + return -1 +endi +if $data[1][3] >= $expectmsgcnt then + return -1 +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_stb +loop_consume_diff_topic_from_stb_end: + +print ================ test consume from ctb +$loop_cnt = 0 +loop_consume_diff_topic_from_ctb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_ctb_column + $topicList = ' . topic_ctb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ctb_all + $topicList = ' . topic_ctb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ctb_function + $topicList = ' . topic_ctb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ctb_end +endi + +$consumerId = 0 +$totalMsgOfCtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfCtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfCtb then + if $data[1][2] != $totalMsgOfCtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfCtb then + if $data[0][2] != $totalMsgOfCtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfCtb then + if $data[1][3] != $totalMsgOfCtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfCtb then + if $data[0][3] != $totalMsgOfCtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ctb +loop_consume_diff_topic_from_ctb_end: + +print ================ test consume from ntb +$loop_cnt = 0 +loop_consume_diff_topic_from_ntb: + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdb_index = $cdb_index + 1 +$cdbName = cdb . $cdb_index +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + +if $loop_cnt == 0 then + print == scenario 1: topic_ntb_column + $topicList = ' . topic_ntb_column + $topicList = $topicList . ' +elif $loop_cnt == 1 then + print == scenario 2: topic_ntb_all + $topicList = ' . topic_ntb_all + $topicList = $topicList . ' +elif $loop_cnt == 2 then + print == scenario 3: topic_ntb_function + $topicList = ' . topic_ntb_function + $topicList = $topicList . ' +else + goto loop_consume_diff_topic_from_ntb_end +endi + +$consumerId = 0 +$totalMsgOfNtb = $rowsPerCtb +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[1][1] == 0 then + if $data[0][1] != 1 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfNtb then + if $data[1][2] != $totalMsgOfNtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfNtb then + if $data[0][2] != $totalMsgOfNtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfNtb then + if $data[1][3] != $totalMsgOfNtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfNtb then + if $data[0][3] != $totalMsgOfNtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi +$loop_cnt = $loop_cnt + 1 +goto loop_consume_diff_topic_from_ntb +loop_consume_diff_topic_from_ntb_end: + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/basic4Of2Cons.sim b/tests/script/tsim/tmq/basic4Of2Cons.sim new file mode 100644 index 0000000000000000000000000000000000000000..75868bd9746b3526ec384b21715848747547e0f5 --- /dev/null +++ b/tests/script/tsim/tmq/basic4Of2Cons.sim @@ -0,0 +1,324 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics +#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics +#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics + +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# + +run tsim/tmq/prepareBasicEnv-4vgrp.sim + +#---- global parameters start ----# +$dbName = db +$vgroups = 4 +$stbPrefix = stb +$ctbPrefix = ctb +$ntbPrefix = ntb +$stbNum = 1 +$ctbNum = 10 +$ntbNum = 10 +$rowsPerCtb = 10 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +#---- global parameters end ----# + +$pullDelay = 5 +$ifcheckdata = 1 +$showMsg = 1 +$showRow = 0 + +sql connect +sql use $dbName + +print == create topics from super table +sql create topic topic_stb_column as select ts, c3 from stb +sql create topic topic_stb_all as select ts, c1, c2, c3 from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +print == create topics from child table +sql create topic topic_ctb_column as select ts, c3 from ctb0 +sql create topic topic_ctb_all as select * from ctb0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0 + +print == create topics from normal table +sql create topic topic_ntb_column as select ts, c3 from ntb0 +sql create topic topic_ntb_all as select * from ntb0 +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 + +#sql show topics +#if $rows != 9 then +# return -1 +#endi + +$keyList = ' . group.id:cgrp1 +$keyList = $keyList . ' + +$topicNum = 3 + +print ================ test consume from stb +print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function +$topicList = ' . topic_stb_column +$topicList = $topicList . , +$topicList = $topicList . topic_stb_all +$topicList = $topicList . , +$topicList = $topicList . topic_stb_function +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfStb = $ctbNum * $rowsPerCtb +$totalMsgOfStb = $totalMsgOfStb * $topicNum +$expectmsgcnt = $totalMsgOfStb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from stb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start + +print == check consume result +wait_consumer_end_from_stb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_stb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] <= 0 then + return -1 +endi +if $data[0][2] >= $expectmsgcnt then + return -1 +endi + +if $data[1][2] <= 0 then + return -1 +endi +if $data[1][2] >= $expectmsgcnt then + return -1 +endi + +if $data[0][3] <= 0 then + return -1 +endi +if $data[0][3] >= $expectmsgcnt then + return -1 +endi + +if $data[1][3] <= 0 then + return -1 +endi +if $data[1][3] >= $expectmsgcnt then + return -1 +endi + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb1 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ctb +print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function +$topicList = ' . topic_ctb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_all +$topicList = $topicList . , +$topicList = $topicList . topic_ctb_function +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfCtb = $rowsPerCtb * $topicNum +$expectmsgcnt = $totalMsgOfCtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ctb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result +wait_consumer_end_from_ctb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ctb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[0][1] == 1 then + if $data[1][1] != 0 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfCtb then + if $data[1][2] != $totalMsgOfCtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfCtb then + if $data[0][2] != $totalMsgOfCtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfCtb then + if $data[1][3] != $totalMsgOfCtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfCtb then + if $data[0][3] != $totalMsgOfCtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi + +####################################################################################### +# clear consume info and consume result +#run tsim/tmq/clearConsume.sim +# because drop table function no stable, so by create new db for consume info and result. Modify it later +$cdbName = cdb2 +sql create database $cdbName vgroups 1 +sleep 500 +sql use $cdbName + +print == create consume info table and consume result table +sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int) +sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int) + +sql show tables +if $rows != 2 then + return -1 +endi +####################################################################################### + + +print ================ test consume from ntb +print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function +$topicList = ' . topic_ntb_column +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_all +$topicList = $topicList . , +$topicList = $topicList . topic_ntb_function +$topicList = $topicList . ' + +$consumerId = 0 +$totalMsgOfNtb = $rowsPerCtb * $topicNum +$expectmsgcnt = $totalMsgOfNtb +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) +$consumerId = 1 +sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata ) + +print == start consumer to pull msgs from ntb +print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start +system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start + +print == check consume result from ntb +wait_consumer_end_from_ntb: +sql select * from consumeresult +print ==> rows: $rows +print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6] +print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] +if $rows != 2 then + sleep 1000 + goto wait_consumer_end_from_ntb +endi +if $data[0][1] == 0 then + if $data[1][1] != 1 then + return -1 + endi +endi +if $data[1][1] == 0 then + if $data[0][1] != 1 then + return -1 + endi +endi + +if $data[0][2] != $totalMsgOfNtb then + if $data[1][2] != $totalMsgOfNtb then + return -1 + endi + if $data[0][2] != 0 then + return -1 + endi +endi +if $data[1][2] != $totalMsgOfNtb then + if $data[0][2] != $totalMsgOfNtb then + return -1 + endi + if $data[1][2] != 0 then + return -1 + endi +endi + +if $data[0][3] != $totalMsgOfNtb then + if $data[1][3] != $totalMsgOfNtb then + return -1 + endi + if $data[0][3] != 0 then + return -1 + endi +endi +if $data[1][3] != $totalMsgOfNtb then + if $data[0][3] != $totalMsgOfNtb then + return -1 + endi + if $data[1][3] != 0 then + return -1 + endi +endi + +#------ not need stop consumer, because it exit after pull msg overthan expect msg +#system tsim/tmq/consume.sh -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 9f6c1a59eb40efca8e0a25b3a2095dd7fc222ff1..cf627fb5fb211e6319f82b2f21210846260be509 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -371,6 +371,8 @@ void *consumeThreadFunc(void *param) { loop_consume(pInfo); + tmq_commit(pInfo->tmq, NULL, 0); + err = tmq_unsubscribe(pInfo->tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));