未验证 提交 dc0e36f7 编写于 作者: D dapan1121 提交者: GitHub

Merge branch '3.0' into enh/clientPolicy

...@@ -321,7 +321,11 @@ TDengine provides abundant developing tools for users to develop on TDengine. Fo ...@@ -321,7 +321,11 @@ TDengine provides abundant developing tools for users to develop on TDengine. Fo
Please follow the [contribution guidelines](CONTRIBUTING.md) to contribute to the project. Please follow the [contribution guidelines](CONTRIBUTING.md) to contribute to the project.
# Join TDengine User Community # Join the TDengine Community
- Join [TDengine Discord Channel](https://discord.com/invite/VZdSuUg4pS?utm_id=discord) For more information about TDengine, you can follow us on social media and join our Discord server:
- Join wechat group by adding WeChat “tdengine”
- [Discord](https://discord.com/invite/VZdSuUg4pS)
- [Twitter](https://twitter.com/TaosData)
- [LinkedIn](https://www.linkedin.com/company/tdengine/)
- [YouTube](https://www.youtube.com/channel/UCmp-1U6GS_3V3hjir6Uq5DQ)
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 8207c74 GIT_TAG cf1df1c
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3); ...@@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
### 创建流 ### 创建流
```sql ```sql
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s); create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);
``` ```
### 写入数据 ### 写入数据
...@@ -71,7 +71,7 @@ insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000); ...@@ -71,7 +71,7 @@ insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);
```sql ```sql
taos> select start, end, max_current from current_stream_output_stb; taos> select start, end, max_current from current_stream_output_stb;
start | end | max_current | start | wend | max_current |
=========================================================================== ===========================================================================
2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 | 10.30000 | 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 | 10.30000 |
2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 | 12.60000 | 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 | 12.60000 |
......
...@@ -165,7 +165,8 @@ typedef struct SVnodeModifyLogicNode { ...@@ -165,7 +165,8 @@ typedef struct SVnodeModifyLogicNode {
typedef struct SExchangeLogicNode { typedef struct SExchangeLogicNode {
SLogicNode node; SLogicNode node;
int32_t srcGroupId; int32_t srcStartGroupId;
int32_t srcEndGroupId;
} SExchangeLogicNode; } SExchangeLogicNode;
typedef struct SMergeLogicNode { typedef struct SMergeLogicNode {
...@@ -400,7 +401,10 @@ typedef struct SDownstreamSourceNode { ...@@ -400,7 +401,10 @@ typedef struct SDownstreamSourceNode {
typedef struct SExchangePhysiNode { typedef struct SExchangePhysiNode {
SPhysiNode node; SPhysiNode node;
int32_t srcGroupId; // group id of datasource suplans // for set operators, there will be multiple execution groups under one exchange, and the ids of these execution
// groups are consecutive
int32_t srcStartGroupId;
int32_t srcEndGroupId;
bool singleChannel; bool singleChannel;
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode; } SExchangePhysiNode;
......
...@@ -291,25 +291,25 @@ _query: ...@@ -291,25 +291,25 @@ _query:
tDecoderClear(&dc); tDecoderClear(&dc);
goto _exit; goto _exit;
} }
{ // Traverse to find the previous qualified data { // Traverse to find the previous qualified data
TBC *pCur; TBC *pCur;
tdbTbcOpen(pMeta->pTbDb, &pCur, NULL); tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
STbDbKey key = {.version = sver, .uid = INT64_MAX}; STbDbKey key = {.version = sver, .uid = INT64_MAX};
int c = 0; int c = 0;
tdbTbcMoveTo(pCur, &key, sizeof(key), &c); tdbTbcMoveTo(pCur, &key, sizeof(key), &c);
if(c < 0){ if (c < 0) {
tdbTbcMoveToPrev(pCur); tdbTbcMoveToPrev(pCur);
} }
void *pKey = NULL; void *pKey = NULL;
void *pVal = NULL; void *pVal = NULL;
int vLen = 0, kLen = 0; int vLen = 0, kLen = 0;
while(1){ while (1) {
int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen); int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break; if (ret < 0) break;
STbDbKey *tmp = (STbDbKey*)pKey; STbDbKey *tmp = (STbDbKey *)pKey;
if(tmp->uid != uid){ if (tmp->uid != uid) {
continue; continue;
} }
SDecoder dcNew = {0}; SDecoder dcNew = {0};
...@@ -662,12 +662,13 @@ int64_t metaGetTbNum(SMeta *pMeta) { ...@@ -662,12 +662,13 @@ int64_t metaGetTbNum(SMeta *pMeta) {
// N.B. Called by statusReq per second // N.B. Called by statusReq per second
int64_t metaGetTimeSeriesNum(SMeta *pMeta) { int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column) // sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 || ++pMeta->pVnode->config.vndStats.itvTimeSeries % 60 == 0) { if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
int64_t num = 0; int64_t num = 0;
vnodeGetTimeSeriesNum(pMeta->pVnode, &num); vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
pMeta->pVnode->config.vndStats.numOfTimeSeries = num; pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
pMeta->pVnode->config.vndStats.itvTimeSeries = 0; pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
} }
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries; return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
......
...@@ -25,17 +25,17 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { ...@@ -25,17 +25,17 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid); int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
if (tEncodeI32(pEncoder, size) < 0) return -1; if (tEncodeI32(pEncoder, size) < 0) return -1;
void *pIter = NULL; void* pIter = NULL;
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
while(pIter){ while (pIter) {
int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL); int64_t* tbUid = (int64_t*)taosHashGetKey(pIter, NULL);
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1; if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
} }
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
} }
tEndEncode(pEncoder); tEndEncode(pEncoder);
...@@ -52,17 +52,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { ...@@ -52,17 +52,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
}else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
int32_t size = 0; int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1; if (tDecodeI32(pDecoder, &size) < 0) return -1;
for(int32_t i = 0; i < size; i++){ for (int32_t i = 0; i < size; i++) {
int64_t tbUid = 0; int64_t tbUid = 0;
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1; if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
} }
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
} }
tEndDecode(pDecoder); tEndDecode(pDecoder);
...@@ -117,7 +117,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ ...@@ -117,7 +117,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
return -1; return -1;
} }
if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) { if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) {
return -1; return -1;
} }
...@@ -284,7 +284,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -284,7 +284,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
}; };
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task = qCreateQueueExecTaskInfo( handle.execHandle.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.task); ASSERT(handle.execHandle.task);
...@@ -297,9 +296,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -297,9 +296,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
handle.execHandle.task = (SSnapContext**)(&reader.sContext));
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
...@@ -314,9 +313,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -314,9 +313,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.execHandle.task = handle.fetchMeta, (SSnapContext**)(&reader.sContext));
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
......
...@@ -1287,14 +1287,14 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons ...@@ -1287,14 +1287,14 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons
if (p->version >= pBlock->minVer) { if (p->version >= pBlock->minVer) {
if (i < num - 1) { if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (i + 1 == num - 1) { // pnext is the last point // if (i + 1 == num - 1) { // pnext is the last point
if (pnext->ts >= pBlock->minKey.ts) { if (pnext->ts >= pBlock->minKey.ts) {
return true; return true;
} // }
} else { // } else {
if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) { // if (pnext->ts >= pBlock->minKey.ts) {
return true; // return true;
} // }
} }
} else { // it must be the last point } else { // it must be the last point
ASSERT(p->version == 0); ASSERT(p->version == 0);
......
...@@ -777,9 +777,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -777,9 +777,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId)); SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId));
if (NULL == group) { if (NULL == group) {
qError("exchange src group %d not in groupHash", pExchNode->srcGroupId); qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -814,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -814,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
} }
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1, pExchNode->singleChannel)); QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel));
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_SORT: { case QUERY_NODE_PHYSICAL_PLAN_SORT: {
......
...@@ -429,7 +429,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi ...@@ -429,7 +429,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) { static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId); COPY_SCALAR_FIELD(srcStartGroupId);
COPY_SCALAR_FIELD(srcEndGroupId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -726,14 +726,18 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) { ...@@ -726,14 +726,18 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) {
return code; return code;
} }
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId"; static const char* jkExchangeLogicPlanSrcStartGroupId = "SrcStartGroupId";
static const char* jkExchangeLogicPlanSrcEndGroupId = "SrcEndGroupId";
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj; const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson); int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId); code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcStartGroupId, pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcEndGroupId, pNode->srcEndGroupId);
} }
return code; return code;
...@@ -744,7 +748,10 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) { ...@@ -744,7 +748,10 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToLogicPlanNode(pJson, pObj); int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId); code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcStartGroupId, &pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcEndGroupId, &pNode->srcEndGroupId);
} }
return code; return code;
...@@ -1837,7 +1844,8 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { ...@@ -1837,7 +1844,8 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
return code; return code;
} }
static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId"; static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints"; static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
...@@ -1845,7 +1853,10 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1845,7 +1853,10 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
int32_t code = physicPlanNodeToJson(pObj, pJson); int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId); code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcStartGroupId, pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcEndGroupId, pNode->srcEndGroupId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints); code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
...@@ -1859,7 +1870,10 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { ...@@ -1859,7 +1870,10 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysicPlanNode(pJson, pObj); int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId); code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcStartGroupId, &pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcEndGroupId, &pNode->srcEndGroupId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints); code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
......
...@@ -2380,7 +2380,8 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2380,7 +2380,8 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) {
enum { enum {
PHY_EXCHANGE_CODE_BASE_NODE = 1, PHY_EXCHANGE_CODE_BASE_NODE = 1,
PHY_EXCHANGE_CODE_SRC_GROUP_ID, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
PHY_EXCHANGE_CODE_SINGLE_CHANNEL, PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
PHY_EXCHANGE_CODE_SRC_ENDPOINTS PHY_EXCHANGE_CODE_SRC_ENDPOINTS
}; };
...@@ -2390,7 +2391,10 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2390,7 +2391,10 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node); int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_GROUP_ID, pNode->srcGroupId); code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, pNode->srcStartGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, pNode->srcEndGroupId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel); code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel);
...@@ -2412,8 +2416,11 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2412,8 +2416,11 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_EXCHANGE_CODE_BASE_NODE: case PHY_EXCHANGE_CODE_BASE_NODE:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node); code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
break; break;
case PHY_EXCHANGE_CODE_SRC_GROUP_ID: case PHY_EXCHANGE_CODE_SRC_START_GROUP_ID:
code = tlvDecodeI32(pTlv, &pNode->srcGroupId); code = tlvDecodeI32(pTlv, &pNode->srcStartGroupId);
break;
case PHY_EXCHANGE_CODE_SRC_END_GROUP_ID:
code = tlvDecodeI32(pTlv, &pNode->srcEndGroupId);
break; break;
case PHY_EXCHANGE_CODE_SINGLE_CHANNEL: case PHY_EXCHANGE_CODE_SINGLE_CHANNEL:
code = tlvDecodeBool(pTlv, &pNode->singleChannel); code = tlvDecodeBool(pTlv, &pNode->singleChannel);
......
...@@ -1046,7 +1046,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic ...@@ -1046,7 +1046,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
*pPhyNode = (SPhysiNode*)pExchange; *pPhyNode = (SPhysiNode*)pExchange;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1425,7 +1426,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { ...@@ -1425,7 +1426,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
if (NULL == pExchange) { if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pExchange->srcGroupId = pMerge->srcGroupId; pExchange->srcStartGroupId = pMerge->srcGroupId;
pExchange->srcEndGroupId = pMerge->srcGroupId;
pExchange->singleChannel = true; pExchange->singleChannel = true;
pExchange->node.pParent = (SPhysiNode*)pMerge; pExchange->node.pParent = (SPhysiNode*)pMerge;
pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc); pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
......
...@@ -84,7 +84,8 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE ...@@ -84,7 +84,8 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
if (NULL == pExchange) { if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pExchange->srcGroupId = pCxt->groupId; pExchange->srcStartGroupId = pCxt->groupId;
pExchange->srcEndGroupId = pCxt->groupId;
pExchange->node.precision = pChild->precision; pExchange->node.precision = pChild->precision;
pExchange->node.pTargets = nodesCloneList(pChild->pTargets); pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
if (NULL == pExchange->node.pTargets) { if (NULL == pExchange->node.pTargets) {
...@@ -112,7 +113,8 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla ...@@ -112,7 +113,8 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) { static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) { if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId; return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
} }
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) { if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
...@@ -1184,6 +1186,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl ...@@ -1184,6 +1186,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
} }
++(pCxt->groupId);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
nodesDestroyList(pSubplanChildren); nodesDestroyList(pSubplanChildren);
...@@ -1207,12 +1210,14 @@ static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, ...@@ -1207,12 +1210,14 @@ static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan,
return false; return false;
} }
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) { static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
SProjectLogicNode* pProject) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) { if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pExchange->srcGroupId = pCxt->groupId; pExchange->srcStartGroupId = startGroupId;
pExchange->srcEndGroupId = pCxt->groupId - 1;
pExchange->node.precision = pProject->node.precision; pExchange->node.precision = pProject->node.precision;
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets); pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
if (NULL == pExchange->node.pTargets) { if (NULL == pExchange->node.pTargets) {
...@@ -1246,11 +1251,11 @@ static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -1246,11 +1251,11 @@ static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t startGroupId = pCxt->groupId;
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject); int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject); code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
} }
++(pCxt->groupId);
pCxt->split = true; pCxt->split = true;
return code; return code;
} }
...@@ -1260,12 +1265,14 @@ typedef struct SUnionDistinctSplitInfo { ...@@ -1260,12 +1265,14 @@ typedef struct SUnionDistinctSplitInfo {
SLogicSubplan* pSubplan; SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo; } SUnionDistinctSplitInfo;
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) { static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
SAggLogicNode* pAgg) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) { if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pExchange->srcGroupId = pCxt->groupId; pExchange->srcStartGroupId = startGroupId;
pExchange->srcEndGroupId = pCxt->groupId - 1;
pExchange->node.precision = pAgg->node.precision; pExchange->node.precision = pAgg->node.precision;
pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys); pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
if (NULL == pExchange->node.pTargets) { if (NULL == pExchange->node.pTargets) {
...@@ -1293,11 +1300,11 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) ...@@ -1293,11 +1300,11 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t startGroupId = pCxt->groupId;
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg); int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg); code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
} }
++(pCxt->groupId);
pCxt->split = true; pCxt->split = true;
return code; return code;
} }
...@@ -1430,7 +1437,7 @@ static const SSplitRule splitRuleSet[] = { ...@@ -1430,7 +1437,7 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, // not used yet
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit} {.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
}; };
// clang-format on // clang-format on
......
...@@ -63,7 +63,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo ...@@ -63,7 +63,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) { static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) { if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode; SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
if (pExchange->srcGroupId == groupId) { if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) {
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource)); return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource));
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
......
...@@ -191,6 +191,7 @@ typedef struct SQWorker { ...@@ -191,6 +191,7 @@ typedef struct SQWorker {
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb; SMsgCb msgCb;
SQWStat stat; SQWStat stat;
int32_t *destroyed;
} SQWorker; } SQWorker;
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
......
...@@ -485,6 +485,8 @@ void qwDestroyImpl(void *pMgmt) { ...@@ -485,6 +485,8 @@ void qwDestroyImpl(void *pMgmt) {
} }
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
*mgmt->destroyed = 1;
taosMemoryFree(mgmt); taosMemoryFree(mgmt);
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1); atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
......
...@@ -1105,10 +1105,17 @@ void qWorkerDestroy(void **qWorkerMgmt) { ...@@ -1105,10 +1105,17 @@ void qWorkerDestroy(void **qWorkerMgmt) {
return; return;
} }
int32_t destroyed = 0;
SQWorker *mgmt = *qWorkerMgmt; SQWorker *mgmt = *qWorkerMgmt;
mgmt->destroyed = &destroyed;
if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) { if (taosRemoveRef(gQwMgmt.qwRef, mgmt->refId)) {
qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId); qError("remove qw from ref list failed, refId:%" PRIx64, mgmt->refId);
return;
}
while (0 == destroyed) {
taosMsleep(2);
} }
} }
......
...@@ -99,7 +99,7 @@ typedef struct SFilterRange { ...@@ -99,7 +99,7 @@ typedef struct SFilterRange {
typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t); typedef bool (*rangeCompFunc) (const void *, const void *, const void *, const void *, __compar_fn_t);
typedef int32_t(*filter_desc_compare_func)(const void *, const void *); typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
typedef bool(*filter_exec_func)(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols); typedef bool(*filter_exec_func)(void*, int32_t, SColumnInfoData*, SColumnDataAgg*, int16_t, int32_t*);
typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **); typedef int32_t (*filer_get_col_from_name)(void *, int32_t, char*, void **);
typedef struct SFilterRangeCompare { typedef struct SFilterRangeCompare {
......
...@@ -3067,15 +3067,16 @@ _return: ...@@ -3067,15 +3067,16 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) { static FORCE_INLINE bool filterExecuteImplAll(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) {
return true; return true;
} }
static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols) { static FORCE_INLINE bool filterExecuteImplEmpty(void *info, int32_t numOfRows, SColumnInfoData* p, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) {
return false; return false;
} }
static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes,
SColumnDataAgg *statis, int16_t numOfCols, int32_t *numOfQualified) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
...@@ -3097,7 +3098,9 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows, ...@@ -3097,7 +3098,9 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
return all; return all;
} }
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) {
static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes,
SColumnDataAgg *statis, int16_t numOfCols, int32_t *numOfQualified) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
...@@ -3120,7 +3123,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows ...@@ -3120,7 +3123,7 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
return all; return all;
} }
bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols, int32_t* numOfQualified) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
uint16_t dataSize = info->cunits[0].dataSize; uint16_t dataSize = info->cunits[0].dataSize;
...@@ -3136,8 +3139,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe ...@@ -3136,8 +3139,9 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe
int8_t* p = (int8_t*) pRes->pData; int8_t* p = (int8_t*) pRes->pData;
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
void *colData = colDataGetData((SColumnInfoData *)info->cunits[0].colData, i);
SColumnInfoData* pData = info->cunits[0].colData; SColumnInfoData* pData = info->cunits[0].colData;
void *colData = colDataGetData(pData, i);
if (colData == NULL || colDataIsNull_s(pData, i)) { if (colData == NULL || colDataIsNull_s(pData, i)) {
all = false; all = false;
continue; continue;
...@@ -3147,13 +3151,16 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe ...@@ -3147,13 +3151,16 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData* pRe
if (p[i] == 0) { if (p[i] == 0) {
all = false; all = false;
} else {
(*numOfQualified)++;
} }
} }
return all; return all;
} }
bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SColumnDataAgg *statis,
int16_t numOfCols, int32_t *numOfQualified) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
...@@ -3195,8 +3202,8 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes ...@@ -3195,8 +3202,8 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes
return all; return all;
} }
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SColumnDataAgg *statis, int16_t numOfCols,
bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData* pRes, SColumnDataAgg *statis, int16_t numOfCols) { int32_t *numOfQualified) {
SFilterInfo *info = (SFilterInfo *)pinfo; SFilterInfo *info = (SFilterInfo *)pinfo;
bool all = true; bool all = true;
...@@ -4048,7 +4055,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SC ...@@ -4048,7 +4055,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData** p, SC
*p = output.columnData; *p = output.columnData;
output.numOfRows = pSrc->info.rows; output.numOfRows = pSrc->info.rows;
bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols); bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified);
// todo this should be return during filter procedure // todo this should be return during filter procedure
int32_t num = 0; int32_t num = 0;
......
...@@ -72,15 +72,31 @@ sql create topic topic_ntb_column as select ts, c3 from ntb0 ...@@ -72,15 +72,31 @@ sql create topic topic_ntb_column as select ts, c3 from ntb0
sql create topic topic_ntb_all as select * from ntb0 sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0 sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print == show topics
sql show topics sql show topics
if $rows != 9 then if $rows != 9 then
return -1 return -1
endi endi
print == drop topic
sql drop topic topic_stb_column sql drop topic topic_stb_column
sql show topics
if $rows != 8 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print == show topics
sql show topics
if $rows != 8 then
return -1
endi
print == drop topic
sql drop topic topic_ctb_column sql drop topic topic_ctb_column
sql drop topic topic_ntb_column sql drop topic topic_ntb_column
...@@ -90,4 +106,4 @@ if $rows != 6 then ...@@ -90,4 +106,4 @@ if $rows != 6 then
return -1 return -1
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册