diff --git a/README.md b/README.md index 1034a19f37a37085e9ad2bacbad1a6fd05a91321..fe9bb49ed8d70afb7b1f3a84933efd62b672229e 100644 --- a/README.md +++ b/README.md @@ -321,7 +321,11 @@ TDengine provides abundant developing tools for users to develop on TDengine. Fo Please follow the [contribution guidelines](CONTRIBUTING.md) to contribute to the project. -# Join TDengine User Community +# Join the TDengine Community -- Join [TDengine Discord Channel](https://discord.com/invite/VZdSuUg4pS?utm_id=discord) -- Join wechat group by adding WeChat “tdengine” +For more information about TDengine, you can follow us on social media and join our Discord server: + +- [Discord](https://discord.com/invite/VZdSuUg4pS) +- [Twitter](https://twitter.com/TaosData) +- [LinkedIn](https://www.linkedin.com/company/tdengine/) +- [YouTube](https://www.youtube.com/channel/UCmp-1U6GS_3V3hjir6Uq5DQ) diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 52e2c0944a78ed0114001494e2edd73e3e88049d..39f243d1478a7a947c13d8e6aefe9906ea202790 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 8207c74 + GIT_TAG cf1df1c SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/zh/07-develop/06-stream.md b/docs/zh/07-develop/06-stream.md index d5296582d500e3271130bc1bfc6de34492133a8a..c9f1b1d43a3b8666e747db80b5e34473f45f0675 100644 --- a/docs/zh/07-develop/06-stream.md +++ b/docs/zh/07-develop/06-stream.md @@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3); ### 创建流 ```sql -create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s); +create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s); ``` ### 写入数据 @@ -71,7 +71,7 @@ insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000); ```sql taos> select start, end, max_current from current_stream_output_stb; - start | end | max_current | + start | wend | max_current | =========================================================================== 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 | 10.30000 | 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 | 12.60000 | diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 07994d65d262eba7cabb4c575e8d77b5efa69245..a531eeffdee9dbd94318ec73c08f076311f1bb3b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -165,7 +165,8 @@ typedef struct SVnodeModifyLogicNode { typedef struct SExchangeLogicNode { SLogicNode node; - int32_t srcGroupId; + int32_t srcStartGroupId; + int32_t srcEndGroupId; } SExchangeLogicNode; typedef struct SMergeLogicNode { @@ -400,7 +401,10 @@ typedef struct SDownstreamSourceNode { typedef struct SExchangePhysiNode { SPhysiNode node; - int32_t srcGroupId; // group id of datasource suplans + // for set operators, there will be multiple execution groups under one exchange, and the ids of these execution + // groups are consecutive + int32_t srcStartGroupId; + int32_t srcEndGroupId; bool singleChannel; SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 24ade017d37f73e83b09578ea5b2211c5b2c43a5..f549ef84f80012b7969a2776e87a18a9e94120ae 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -291,25 +291,25 @@ _query: tDecoderClear(&dc); goto _exit; } - { // Traverse to find the previous qualified data - TBC *pCur; + { // Traverse to find the previous qualified data + TBC *pCur; tdbTbcOpen(pMeta->pTbDb, &pCur, NULL); STbDbKey key = {.version = sver, .uid = INT64_MAX}; - int c = 0; + int c = 0; tdbTbcMoveTo(pCur, &key, sizeof(key), &c); - if(c < 0){ + if (c < 0) { tdbTbcMoveToPrev(pCur); } void *pKey = NULL; void *pVal = NULL; int vLen = 0, kLen = 0; - while(1){ + while (1) { int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen); if (ret < 0) break; - STbDbKey *tmp = (STbDbKey*)pKey; - if(tmp->uid != uid){ + STbDbKey *tmp = (STbDbKey *)pKey; + if (tmp->uid != uid) { continue; } SDecoder dcNew = {0}; @@ -662,12 +662,13 @@ int64_t metaGetTbNum(SMeta *pMeta) { // N.B. Called by statusReq per second int64_t metaGetTimeSeriesNum(SMeta *pMeta) { // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) - if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || ++pMeta->pVnode->config.vndStats.itvTimeSeries % 60 == 0) { + if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || + ++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) { int64_t num = 0; vnodeGetTimeSeriesNum(pMeta->pVnode, &num); pMeta->pVnode->config.vndStats.numOfTimeSeries = num; - pMeta->pVnode->config.vndStats.itvTimeSeries = 0; + pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2; } return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 62f8debccb8ff9c478de0fb331cc5741b503b011..c55e1059cf8842dccd7d6553d9cff5448dbc5ba5 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -25,17 +25,17 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1; - } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid); if (tEncodeI32(pEncoder, size) < 0) return -1; - void *pIter = NULL; + void* pIter = NULL; pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); - while(pIter){ - int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL); + while (pIter) { + int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL); if (tEncodeI64(pEncoder, *tbUid) < 0) return -1; pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); } - } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; } tEndEncode(pEncoder); @@ -52,17 +52,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; - }else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) return -1; - for(int32_t i = 0; i < size; i++){ + for (int32_t i = 0; i < size; i++) { int64_t tbUid = 0; if (tDecodeI64(pDecoder, &tbUid) < 0) return -1; taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } - } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; } tEndDecode(pDecoder); @@ -117,7 +117,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ return -1; } - if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) { + if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) { return -1; } @@ -284,7 +284,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { }; if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - handle.execHandle.task = qCreateQueueExecTaskInfo( handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.task); @@ -297,9 +296,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); - buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); - handle.execHandle.task = - qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, + (SSnapContext**)(&reader.sContext)); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); @@ -314,9 +313,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); - buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); - handle.execHandle.task = - qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, + handle.fetchMeta, (SSnapContext**)(&reader.sContext)); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ca6f6346a603059df24e70143f1f39832de1bcf8..681b25f67684f9846da940f10c4a24e696985acb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1287,14 +1287,14 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons if (p->version >= pBlock->minVer) { if (i < num - 1) { TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); - if (i + 1 == num - 1) { // pnext is the last point +// if (i + 1 == num - 1) { // pnext is the last point if (pnext->ts >= pBlock->minKey.ts) { return true; - } - } else { - if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) { - return true; - } +// } +// } else { +// if (pnext->ts >= pBlock->minKey.ts) { +// return true; +// } } } else { // it must be the last point ASSERT(p->version == 0); diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index b50aab9b1e63773770ce0e491eda109f530a95f7..c5de44e1f67d3e4843a0357be88bbac730fc2d99 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -777,9 +777,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; - SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId)); + SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId)); if (NULL == group) { - qError("exchange src group %d not in groupHash", pExchNode->srcGroupId); + qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -814,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } - QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1, pExchNode->singleChannel)); + QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel)); break; } case QUERY_NODE_PHYSICAL_PLAN_SORT: { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index f353efcf3ccdb58e91f6a1dc1bc9da78792401e4..c145952778aa17fb660a4732e2c0d6c068de521a 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -429,7 +429,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); - COPY_SCALAR_FIELD(srcGroupId); + COPY_SCALAR_FIELD(srcStartGroupId); + COPY_SCALAR_FIELD(srcEndGroupId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 3242f20830662e2b0558790aee746d8c00dc09a7..af6c1757a9173dd0b9aeb649b07a3c7edcbc3002 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -726,14 +726,18 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) { return code; } -static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId"; +static const char* jkExchangeLogicPlanSrcStartGroupId = "SrcStartGroupId"; +static const char* jkExchangeLogicPlanSrcEndGroupId = "SrcEndGroupId"; static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) { const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj; int32_t code = logicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId); + code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcStartGroupId, pNode->srcStartGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcEndGroupId, pNode->srcEndGroupId); } return code; @@ -744,7 +748,10 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) { int32_t code = jsonToLogicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId); + code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcStartGroupId, &pNode->srcStartGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcEndGroupId, &pNode->srcEndGroupId); } return code; @@ -1837,7 +1844,8 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { return code; } -static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId"; +static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId"; +static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId"; static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints"; static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { @@ -1845,7 +1853,10 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId); + code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcStartGroupId, pNode->srcStartGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcEndGroupId, pNode->srcEndGroupId); } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints); @@ -1859,7 +1870,10 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId); + code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcStartGroupId, &pNode->srcStartGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcEndGroupId, &pNode->srcEndGroupId); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 2249cfe888946e0ec637c23bf5be34ce57853f61..0c6b11a765ca45311968929f035d21af1742f2f3 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2380,7 +2380,8 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_EXCHANGE_CODE_BASE_NODE = 1, - PHY_EXCHANGE_CODE_SRC_GROUP_ID, + PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, + PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, PHY_EXCHANGE_CODE_SRC_ENDPOINTS }; @@ -2390,7 +2391,10 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_GROUP_ID, pNode->srcGroupId); + code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, pNode->srcStartGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, pNode->srcEndGroupId); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel); @@ -2412,8 +2416,11 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) { case PHY_EXCHANGE_CODE_BASE_NODE: code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); break; - case PHY_EXCHANGE_CODE_SRC_GROUP_ID: - code = tlvDecodeI32(pTlv, &pNode->srcGroupId); + case PHY_EXCHANGE_CODE_SRC_START_GROUP_ID: + code = tlvDecodeI32(pTlv, &pNode->srcStartGroupId); + break; + case PHY_EXCHANGE_CODE_SRC_END_GROUP_ID: + code = tlvDecodeI32(pTlv, &pNode->srcEndGroupId); break; case PHY_EXCHANGE_CODE_SINGLE_CHANNEL: code = tlvDecodeBool(pTlv, &pNode->singleChannel); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b0177e61ed290e8ef687e5de8c54c824545d8103..ec7050fdbc3dc1b9b13762182232322934082840 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1046,7 +1046,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic return TSDB_CODE_OUT_OF_MEMORY; } - pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; + pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId; + pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId; *pPhyNode = (SPhysiNode*)pExchange; return TSDB_CODE_SUCCESS; @@ -1425,7 +1426,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } - pExchange->srcGroupId = pMerge->srcGroupId; + pExchange->srcStartGroupId = pMerge->srcGroupId; + pExchange->srcEndGroupId = pMerge->srcGroupId; pExchange->singleChannel = true; pExchange->node.pParent = (SPhysiNode*)pMerge; pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 106e6741522bba0a91f0a7763f313f7211c24f46..74ed3b57a4417aa43662bb722d3b78f3b13ee002 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -84,7 +84,8 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } - pExchange->srcGroupId = pCxt->groupId; + pExchange->srcStartGroupId = pCxt->groupId; + pExchange->srcEndGroupId = pCxt->groupId; pExchange->node.precision = pChild->precision; pExchange->node.pTargets = nodesCloneList(pChild->pTargets); if (NULL == pExchange->node.pTargets) { @@ -112,7 +113,8 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) { if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) { - return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId; + return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId && + groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId; } if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) { @@ -1184,6 +1186,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl if (TSDB_CODE_SUCCESS != code) { break; } + ++(pCxt->groupId); } if (TSDB_CODE_SUCCESS == code) { nodesDestroyList(pSubplanChildren); @@ -1207,12 +1210,14 @@ static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, return false; } -static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) { +static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan, + SProjectLogicNode* pProject) { SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } - pExchange->srcGroupId = pCxt->groupId; + pExchange->srcStartGroupId = startGroupId; + pExchange->srcEndGroupId = pCxt->groupId - 1; pExchange->node.precision = pProject->node.precision; pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets); if (NULL == pExchange->node.pTargets) { @@ -1246,11 +1251,11 @@ static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return TSDB_CODE_SUCCESS; } + int32_t startGroupId = pCxt->groupId; int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject); if (TSDB_CODE_SUCCESS == code) { - code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject); + code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject); } - ++(pCxt->groupId); pCxt->split = true; return code; } @@ -1260,12 +1265,14 @@ typedef struct SUnionDistinctSplitInfo { SLogicSubplan* pSubplan; } SUnionDistinctSplitInfo; -static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) { +static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan, + SAggLogicNode* pAgg) { SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; } - pExchange->srcGroupId = pCxt->groupId; + pExchange->srcStartGroupId = startGroupId; + pExchange->srcEndGroupId = pCxt->groupId - 1; pExchange->node.precision = pAgg->node.precision; pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys); if (NULL == pExchange->node.pTargets) { @@ -1293,11 +1300,11 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) return TSDB_CODE_SUCCESS; } + int32_t startGroupId = pCxt->groupId; int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg); if (TSDB_CODE_SUCCESS == code) { - code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg); + code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg); } - ++(pCxt->groupId); pCxt->split = true; return code; } @@ -1430,7 +1437,7 @@ static const SSplitRule splitRuleSet[] = { {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, - {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, + {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, // not used yet {.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit} }; // clang-format on diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e4f02f12e698768f559bad30ef2a951022bb998e..53549c122d958bbb043f2e9c885d91ea2b742ad1 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -63,7 +63,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) { if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode; - if (pExchange->srcGroupId == groupId) { + if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) { return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource)); } } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 150dde91e0b048af0ba440e90cdf4ce42d7b93f2..3a3226359998d23da690aa361c5dffd1ce7d12fc 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -191,6 +191,7 @@ typedef struct SQWorker { SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx SMsgCb msgCb; SQWStat stat; + int32_t *destroyed; } SQWorker; typedef struct SQWorkerMgmt { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 581eea4a81f90c01333233253d99d8c21f8e1a64..9f1a9a3146b0aeba875d051d731f750779b94e42 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -485,6 +485,8 @@ void qwDestroyImpl(void *pMgmt) { } taosHashCleanup(mgmt->schHash); + *mgmt->destroyed = 1; + taosMemoryFree(mgmt); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 293f6e07b03600c2b082d26148c6a0c0e24a8662..3c102938d02a74d7beda2e95e21b34b4f844fae5 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1105,10 +1105,17 @@ void qWorkerDestroy(void **qWorkerMgmt) { return; } + int32_t destroyed = 0; SQWorker *mgmt = *qWorkerMgmt; - + mgmt->destroyed = &destroyed; + if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) { qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId); + return; + } + + while (0 == destroyed) { + taosMsleep(2); } } diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index 5d243d86e872be722eeaf885ec5a0ff9d3f6108e..ef3d3f79f959805681d0bc0e2e0b101cd8518388 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -99,7 +99,7 @@ typedef struct SFilterRange { typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t); typedef int32_t(*filter_desc_compare_func)(const void *, const void *); -typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols); +typedef bool(*filter_exec_func)(void*, int32_t, SColumnInfoData*, SColumnDataAgg*, int16_t, int32_t*); typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **); typedef struct SFilterRangeCompare { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index dc6b505bb957f09de765181d6da8e6b71211eb56..2ad06249ca930c5efdb7ac4e2406e3b7dae13dd4 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3067,15 +3067,16 @@ _return: return TSDB_CODE_SUCCESS; } -static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) { +static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) { return true; } -static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) { +static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) { return false; } -static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { +static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, + SColumnDataAgg *statis, int16_t numOfCols, int32_t *numOfQualified) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; @@ -3097,7 +3098,9 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, return all; } -static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { + +static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, + SColumnDataAgg *statis, int16_t numOfCols, int32_t *numOfQualified) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; @@ -3120,7 +3123,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows return all; } -bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; uint16_t dataSize = info->cunits[0].dataSize; @@ -3136,8 +3139,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe int8_t* p = (int8_t*) pRes->pData; for (int32_t i = 0; i < numOfRows; ++i) { - void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i); SColumnInfoData* pData = info->cunits[0].colData; + + void *colData = colDataGetData(pData, i); if (colData == NULL || colDataIsNull_s(pData, i)) { all = false; continue; @@ -3147,13 +3151,16 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe if (p[i] == 0) { all = false; + } else { + (*numOfQualified)++; } } return all; } -bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SColumnDataAgg *statis, + int16_t numOfCols, int32_t *numOfQualified) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; @@ -3195,8 +3202,8 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes return all; } - -bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { +bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SColumnDataAgg *statis, int16_t numOfCols, + int32_t *numOfQualified) { SFilterInfo *info = (SFilterInfo *)pinfo; bool all = true; @@ -4048,7 +4055,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SC *p = output.columnData; output.numOfRows = pSrc->info.rows; - bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols); + bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); // todo this should be return during filter procedure int32_t num = 0; diff --git a/tests/script/tsim/tmq/topic.sim b/tests/script/tsim/tmq/topic.sim index 602b1064720636b504a3baa302cdd418c3cabe89..cb1e74798ea072a03d2ea2570a5a245222b5c8c5 100644 --- a/tests/script/tsim/tmq/topic.sim +++ b/tests/script/tsim/tmq/topic.sim @@ -72,15 +72,31 @@ sql create topic topic_ntb_column as select ts, c3 from ntb0 sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start -print == show topics sql show topics if $rows != 9 then return -1 endi -print == drop topic sql drop topic topic_stb_column + +sql show topics +if $rows != 8 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +print == show topics +sql show topics +if $rows != 8 then + return -1 +endi + +print == drop topic sql drop topic topic_ctb_column sql drop topic topic_ntb_column @@ -90,4 +106,4 @@ if $rows != 6 then return -1 endi - +system sh/exec.sh -n dnode1 -s stop -x SIGINT