diff --git a/Jenkinsfile2 b/Jenkinsfile2 index cbf663cdcfeb1ed1f31adbf152dc2e3ff4fc8b66..7a6f79339901c8a9e61aa3961016378dc38c721f 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -117,27 +117,29 @@ def pre_test(){ def pre_test_win(){ bat ''' hostname + ipconfig + set date /t time /t - taskkill /f /t /im python.exe - taskkill /f /t /im bash.exe rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug - exit 0 ''' bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git reset --hard git fetch || git fetch + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git reset --hard git fetch || git fetch - git checkout -f ''' script { if (env.CHANGE_TARGET == 'master') { bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git checkout master + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout master ''' @@ -145,6 +147,8 @@ def pre_test_win(){ bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git checkout 2.0 + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 2.0 ''' @@ -152,6 +156,8 @@ def pre_test_win(){ bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git checkout 3.0 + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 3.0 ''' @@ -159,6 +165,8 @@ def pre_test_win(){ bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git checkout develop + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout develop ''' @@ -169,30 +177,52 @@ def pre_test_win(){ bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git pull - git log -5 + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git pull - git fetch origin +refs/pull/${CHANGE_ID}/merge + ''' + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git fetch origin +refs/pull/%CHANGE_ID%/merge + ''' + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout -qf FETCH_HEAD - git log -5 ''' } else if (env.CHANGE_URL =~ /\/TDinternal\//) { bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git pull - git fetch origin +refs/pull/${CHANGE_ID}/merge + ''' + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git fetch origin +refs/pull/%CHANGE_ID%/merge + ''' + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal git checkout -qf FETCH_HEAD - git log -5 + ''' + bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git pull - git log -5 ''' } else { - sh ''' - echo "unmatched reposiotry ${CHANGE_URL}" + bat ''' + echo "unmatched reposiotry %CHANGE_URL%" ''' } } + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git branch + git log -5 + ''' + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git branch + git log -5 + ''' bat ''' cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git submodule update --init --recursive @@ -205,10 +235,15 @@ def pre_test_build_win() { cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal mkdir debug cd debug + time /t call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64 set CL=/MP8 - cmake .. -G "NMake Makefiles JOM" - jom -j 4 || exit 8 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake" + time /t + cmake .. -G "NMake Makefiles JOM" || exit 7 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6" + time /t + jom -j 6 || exit 8 time /t ''' return 1 @@ -226,6 +261,13 @@ pipeline { stages { stage('run test') { parallel { + stage('windows test') { + agent{label " windows10_01 || windows10_02 || windows10_03 || windows10_04 "} + steps { + pre_test_win() + pre_test_build_win() + } + } stage('linux test') { agent{label " slave3_0 || slave15 || slave16 || slave17 "} options { skipDefaultCheckout() } diff --git a/example/src/tmq.c b/example/src/tmq.c index 00eb8462f4daa00fb7308b49a42f3b63912f8c7d..1abce3f188fcefd77b4f0d0037284b55d79892b0 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -106,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -239,7 +239,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { msg_process(tmqmessage); taos_free_result(tmqmessage); - tmq_commit(tmq, NULL, 1); + tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL); /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ } } diff --git a/include/client/taos.h b/include/client/taos.h index 01943578416d4a2b926d61894821d33212b7f45d..0b8c67aa794363ff851c69e5848978c78c6a4abc 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -232,11 +232,11 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); -DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); -DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); + #if 0 -DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); +DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); #endif diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7a60542313608c633f5da6cb088e73f5110f6b87..f7562a4b9b30aa3314fe19c789da15b4681322b8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1480,6 +1480,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; + char clientId[256]; SArray* topicNames; // SArray } SCMSubscribeReq; @@ -1487,6 +1488,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->cgroup); + tlen += taosEncodeString(buf, pReq->clientId); int32_t topicNum = taosArrayGetSize(pReq->topicNames); tlen += taosEncodeFixedI32(buf, topicNum); @@ -1500,6 +1502,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); + buf = taosDecodeStringTo(buf, pReq->clientId); int32_t topicNum; buf = taosDecodeFixedI32(buf, &topicNum); diff --git a/include/util/tskiplist.h b/include/util/tskiplist.h index eeae1b47daf2ff62511dfe1329ef55e62ed916be..10d3dcdbaaf2cf31d5620ce8c060da5e31585181 100644 --- a/include/util/tskiplist.h +++ b/include/util/tskiplist.h @@ -56,10 +56,10 @@ typedef enum { SSkipListPutSuccess = 0, SSkipListPutEarlyStop = 1, SSkipListPutS typedef struct SSkipList { uint32_t seed; + uint16_t len; __compar_fn_t comparFn; __sl_key_fn_t keyFn; TdThreadRwlock *lock; - uint16_t len; uint8_t maxLevel; uint8_t flags; uint8_t type; // static info above diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e4e5abe148130b031036ed2101b7d8075ecc687e..9fe7645e2b2c5dab0f2f588013269be53a6756f1 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = { static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ec9139836a918eddffce5ed06c11434202dab810..a04417736a32d01619a011b17eab23291460ab82 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -463,7 +463,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char appId[TSDB_CGROUP_LEN]; + char clientId[256]; int8_t updateType; // used only for update int32_t epoch; int32_t status; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 274876567e50f86d47acadbddda15edbdca087af..4fe7d8a3997f6a0ee877d524b144e014c7321532 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerOld = mndAcquireConsumer(pMnode, consumerId); if (pConsumerOld == NULL) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->rebNewTopics = newSub; subscribe.topicNames = NULL; @@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); // app id - char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); - varDataSetLen(appId, strlen(varDataVal(appId))); + char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN); + varDataSetLen(clientId, strlen(varDataVal(clientId))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)appId, false); + colDataAppend(pColInfo, numOfRows, (const char *)clientId, false); // status char status[20 + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 3f559cb6c050f4e19af49af73cc33a7b9e68022a..c6eebb5c5d9600420806728066086699f6080b9b 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -492,8 +492,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + /*SSdb *pSdb = pMnode->pSdb;*/ SMDropTopicReq dropReq = {0}; if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); - // if (pTopic == NULL) { - // if (dropReq.igNotExists) { - // mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); - // return 0; - // } else { - // terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - // mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - // return -1; - // } - // } + if (pTopic == NULL) { + if (dropReq.igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); + return 0; + } else { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + return -1; + } + } if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); @@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); -#if 1 if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); return -1; } -#endif if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6a5ad63822a9d14601bb7e4a8a95b39222d1e6b4..b1760e6daebce734951b89ae5c8220e689dabc08 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -104,6 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); +int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c69f7d71d5db15a9085d0b694ec71a9a1afb3ee6..30de2dcfa0ddefc61eb8dfea089749a7f924105b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -111,6 +111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { pIter = taosHashIterate(pTq->execs, pIter); if (pIter == NULL) break; pExec = (STqExec*)pIter; + if (pExec->subType == TOPIC_SUB_TYPE__DB) continue; for (int32_t i = 0; i < 5; i++) { int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true); ASSERT(code == 0); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 341ab94ca4fe4647d7db95d9948c4eee5d15451b..a67f413ba7f2016797cbcfcf90b0efe094171904 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -15,7 +15,7 @@ #include "tsdb.h" -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); +// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) { SSubmitMsgIter msgIter = {0}; @@ -54,7 +54,38 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * return 0; } -static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { +#if 0 +static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey, + TSKEY now) { + TSKEY rowKey = TD_ROW_KEY(row); + if (rowKey < minKey || rowKey > maxKey) { + tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 + " maxKey %" PRId64 " row key %" PRId64, + REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey, + rowKey); + terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + return -1; + } + + return 0; +} +#endif + +static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey, + TSKEY now) { + TSKEY rowKey = TD_ROW_KEY(row); + if (rowKey < minKey || rowKey > maxKey) { + tsdbError("vgId:%d table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 + " maxKey %" PRId64 " row key %" PRId64, + REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey); + terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; + return -1; + } + + return 0; +} + +int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; SSubmitMsgIter msgIter = {0}; @@ -112,14 +143,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { return -1; } } - - tsdbInitSubmitBlkIter(pBlock, &blkIter); - while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { - if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) { +#endif + tInitSubmitBlkIter(&msgIter, pBlock, &blkIter); + while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { + if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) { return -1; } } -#endif + } if (terrno != TSDB_CODE_SUCCESS) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 6fc9f2998b0304d08fb5349a62423022fef11717..9b6307c5f72bb2fac83a3e1a1750e09680abd6b2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -631,6 +631,11 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); #endif + if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) { + pRsp->code = terrno; + goto _exit; + } + // handle the request if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) { pRsp->code = TSDB_CODE_INVALID_MSG; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index db34f5dc6714ddf7b0be369c1da1ee5d5cb9b7df..3505af46e4c6ad35abb7fbdd5b4981c0bfc267be 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -869,8 +869,12 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; SScalarParam dest = {.columnData = &idata}; - scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - + int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pBlockList); + return code; + } + int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity); colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index 13637b8fe4971f8ed315a62a01b93a665a4d84a1..4ce668e6bab22abc74f509114ab84b0693469634 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -185,10 +185,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it pKey = SL_GET_NODE_KEY(pSkipList, p); compare = pSkipList->comparFn(pKey, pDataKey); - if (compare >= 0) { - if (compare == 0 && !hasDup) hasDup = true; + if (compare > 0) { break; } else { + if (compare == 0 && !hasDup) hasDup = true; px = p; p = SL_NODE_GET_FORWARD_POINTER(px, i); } diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index adbab04e07e31617b3f0aaf7abaa2f2a3c59b3dc..bdda7c453b35b3c258d0bb62703a1a78d668bd13 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -57,7 +57,7 @@ class TDSql: tdLog.notice("'reset query cache' is not supported") s = 'drop database if exists db' self.cursor.execute(s) - s = 'create database db' + s = 'create database db days 300' self.cursor.execute(s) s = 'use db' self.cursor.execute(s) diff --git a/tests/script/tsim/query/interval-offset.sim b/tests/script/tsim/query/interval-offset.sim index 68860dc2cb787afd6a2634596975f988ebabf31e..dcd88e5a0caba132ce6507bc4c0274ce40ecc301 100644 --- a/tests/script/tsim/query/interval-offset.sim +++ b/tests/script/tsim/query/interval-offset.sim @@ -5,7 +5,7 @@ sleep 500 sql connect print =============== create database -sql create database d0 +sql create database d0 days 300 sql use d0 print =============== create super table and child table @@ -254,4 +254,4 @@ endi #sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d) -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 9965772c75eb62db7355a421e334d2cee171f1ea..0997e313c8125725cd75a45102362a93042343a0 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 4 then print ======$data11 - # return -1 + return -1 endi if $data12 != 4 then print ======$data12 - # return -1 + return -1 endi if $data13 != 10 then print ======$data13 - # return -1 + return -1 endi if $data14 != 3 then print ======$data14 - # return -1 + return -1 endi if $data15 != 1 then print ======$data15 - # return -1 + return -1 endi # row 2 if $data21 != 4 then print ======$data21 - # return -1 + return -1 endi if $data22 != 4 then print ======$data22 - # return -1 + return -1 endi if $data23 != 15 then print ======$data23 - # return -1 + return -1 endi if $data24 != 4 then print ======$data24 - # return -1 + return -1 endi if $data25 != 3 then print ======$data25 - # return -1 + return -1 endi diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index dc1a77c23155ba6bc29aed77362c58823295e063..96d7741897d2dd3d888585a4ed5a12be53f5c72f 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -368,7 +368,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { /*msg_process(tmqmessage);*/ taos_free_result(tmqmessage); - if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); + if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL); } } diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index cf113369bc3eae01e019a4d97bc664f5d5a7665b..8322c1a60ce2a8d400a164aa36c87877172c4f43 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -37,10 +37,10 @@ typedef struct { TdThread thread; int32_t consumerId; - int32_t ifManualCommit; - //int32_t autoCommitIntervalMs; // 1000 ms - //char autoCommit[8]; // true, false - //char autoOffsetRest[16]; // none, earliest, latest + int32_t ifManualCommit; + // int32_t autoCommitIntervalMs; // 1000 ms + // char autoCommit[8]; // true, false + // char autoOffsetRest[16]; // none, earliest, latest int32_t ifCheckData; int64_t expectMsgCnt; @@ -99,21 +99,15 @@ static void printHelp() { } void initLogFile() { - time_t now; - struct tm curTime; - char filename[256]; + time_t now; + struct tm curTime; + char filename[256]; - now = taosTime(NULL); + now = taosTime(NULL); taosLocalTime(&now, &curTime); - sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", - configDir, - curTime.tm_year+1900, - curTime.tm_mon+1, - curTime.tm_mday, - curTime.tm_hour, - curTime.tm_min, - curTime.tm_sec); - //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900, + curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec); + // sprintf(filename, "%s/../log/tmqlog.txt", configDir); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", filename); @@ -137,9 +131,9 @@ void saveConfigToLogFile() { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - //taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); - //taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); - //taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); + // taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); + // taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); + // taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); taosFprintfFile(g_fp, " Topics: "); for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); @@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) while (1) { TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; + if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - + taos_print_row(buf, row, fields, numOfFields); - - if (0 != g_stConfInfo.showRowFlag) { + + if (0 != g_stConfInfo.showRowFlag) { taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); } - + totalRows++; } @@ -276,7 +270,7 @@ void build_consumer(SThreadInfo* pInfo) { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - //tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) { pInfo->tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); - + return; } @@ -322,10 +316,10 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); - time_t tTime = taosGetTimestampSec(); + time_t tTime = taosGetTimestampSec(); struct tm tm = *taosLocalTime(&tTime, NULL); taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr); + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { @@ -357,11 +351,11 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; if (totalRows >= pInfo->expectMsgCnt) { - taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); + taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); break; } - } else { - taosFprintfFile(g_fp, "==== delay over time, so break\n"); + } else { + taosFprintfFile(g_fp, "==== delay over time, so break\n"); break; } } @@ -389,7 +383,7 @@ void* consumeThreadFunc(void* param) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - + tmq_list_destroy(pInfo->topicList); pInfo->topicList = NULL; @@ -397,17 +391,18 @@ void* consumeThreadFunc(void* param) { if (pInfo->ifManualCommit) { taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); - pPrint("tmq_commit() manual commit when consume end.\n"); - tmq_commit(pInfo->tmq, NULL, 0); + pPrint("tmq_commit() manual commit when consume end.\n"); + /*tmq_commit(pInfo->tmq, NULL, 0);*/ + tmq_commit_sync(pInfo->tmq, NULL); } - + err = tmq_unsubscribe(pInfo->tmq); if (err) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pInfo->consumeMsgCnt = -1; return NULL; } - + err = tmq_consumer_close(pInfo->tmq); if (err) { pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); @@ -485,9 +480,9 @@ int32_t getConsumeInfo() { int32_t* lengths = taos_fetch_lengths(pRes); // set default value - //g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; - //memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); - //memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); + // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; + // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); + // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) {