提交 b2e615d4 编写于 作者: S shenglian zhou

enhance: tag scan cursor based block

上级 10f5d223
...@@ -98,6 +98,16 @@ typedef struct SMTbCursor { ...@@ -98,6 +98,16 @@ typedef struct SMTbCursor {
int8_t paused; int8_t paused;
} SMTbCursor; } SMTbCursor;
typedef struct SMCtbCursor {
SMeta *pMeta;
void *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
} SMCtbCursor;
typedef struct SRowBuffPos { typedef struct SRowBuffPos {
void* pRowBuff; void* pRowBuff;
void* pKey; void* pKey;
...@@ -278,13 +288,15 @@ typedef struct SStoreMeta { ...@@ -278,13 +288,15 @@ typedef struct SStoreMeta {
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode); int64_t (*getNumOfRowsInMem)(void* pVnode);
/** /**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/ */
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
} SStoreMeta; } SStoreMeta;
typedef struct SStoreMetaReader { typedef struct SStoreMetaReader {
......
...@@ -167,7 +167,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); ...@@ -167,7 +167,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta); int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock); SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock); void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid); SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
......
...@@ -408,17 +408,9 @@ _err: ...@@ -408,17 +408,9 @@ _err:
return NULL; return NULL;
} }
struct SMCtbCursor {
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) { SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMCtbCursor *pCtbCur = NULL; SMCtbCursor *pCtbCur = NULL;
SCtbIdxKey ctbIdxKey; SCtbIdxKey ctbIdxKey;
int ret = 0; int ret = 0;
......
...@@ -96,6 +96,10 @@ void initMetadataAPI(SStoreMeta* pMeta) { ...@@ -96,6 +96,10 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup; pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache; pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
pMeta->openCtbCursor = metaOpenCtbCursor;
pMeta->closeCtbCursor = metaCloseCtbCursor;
pMeta->ctbCursorNext = metaCtbCursorNext;
} }
void initTqAPI(SStoreTqReader* pTq) { void initTqAPI(SStoreTqReader* pTq) {
......
...@@ -190,4 +190,6 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag); ...@@ -190,4 +190,6 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
#endif // TDENGINE_EXECUTIL_H #endif // TDENGINE_EXECUTIL_H
...@@ -259,6 +259,10 @@ typedef struct STagScanInfo { ...@@ -259,6 +259,10 @@ typedef struct STagScanInfo {
SLimitNode* pSlimit; SLimitNode* pSlimit;
SReadHandle readHandle; SReadHandle readHandle;
STableListInfo* pTableListInfo; STableListInfo* pTableListInfo;
uint64_t suid;
void* pCtbCursor;
SNode* pTagCond;
SNode* pTagIndexCond;
} STagScanInfo; } STagScanInfo;
typedef enum EStreamScanMode { typedef enum EStreamScanMode {
......
...@@ -81,7 +81,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -81,7 +81,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
......
...@@ -47,8 +47,6 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* p ...@@ -47,8 +47,6 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* p
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
...@@ -846,7 +844,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S ...@@ -846,7 +844,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
return -1; return -1;
} }
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI) { SStorageAPI* pStorageAPI) {
SSDataBlock* pResBlock = createDataBlock(); SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) { if (pResBlock == NULL) {
......
...@@ -380,7 +380,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR ...@@ -380,7 +380,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
return NULL; return NULL;
} }
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
......
...@@ -2688,6 +2688,271 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, ...@@ -2688,6 +2688,271 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
} }
} }
static void tagScanFreeUidTag(void* p) {
STUidTagInfo* pInfo = p;
if (pInfo->pTagVal != NULL) {
taosMemoryFree(pInfo->pTagVal);
}
}
static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pColumnData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pColumnData->info.type = pType->type;
pColumnData->info.bytes = pType->bytes;
pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision;
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pColumnData);
return terrno;
}
pParam->columnData = pColumnData;
pParam->colAlloced = true;
return TSDB_CODE_SUCCESS;
}
typedef struct STagScanFilterContext {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} STagScanFilterContext;
static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
SColumnNode* pSColumnNode = NULL;
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
pSColumnNode = *(SColumnNode**)pNode;
} else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pSColumnNode) {
return DEAL_RES_ERROR;
}
pSColumnNode->colId = -1;
pSColumnNode->colType = COLUMN_TYPE_TBNAME;
pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
nodesDestroyNode(*pNode);
*pNode = (SNode*)pSColumnNode;
} else {
return DEAL_RES_CONTINUE;
}
} else {
return DEAL_RES_CONTINUE;
}
STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext;
void* data = taosHashGet(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
if (!data) {
taosHashPut(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
pSColumnNode->slotId = pCtx->index++;
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
.type = pSColumnNode->node.resType.type,
.bytes = pSColumnNode->node.resType.bytes};
taosArrayPush(pCtx->cInfoList, &cInfo);
} else {
SColumnNode* col = *(SColumnNode**)data;
pSColumnNode->slotId = col->slotId;
}
return DEAL_RES_CONTINUE;
}
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aUidTagIdxs, void* pVnode, SStorageAPI* pAPI) {
int32_t code = 0;
int32_t numOfTables = taosArrayGetSize(aUidTags);
STagScanFilterContext ctx = {0};
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx);
SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
if (pResBlock == NULL) {
}
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pBlockList, &pResBlock);
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
SScalarParam output = {0};
code = tagScanCreateResultData(&type, numOfTables, &output);
if (code != TSDB_CODE_SUCCESS) {
}
code = scalarCalculate(pTagCond, pBlockList, &output);
if (code != TSDB_CODE_SUCCESS) {
}
bool* result = (bool*)output.columnData->pData;
for (int32_t i = 0 ; i < numOfTables; ++i) {
if (result[i]) {
taosArrayPush(aUidTagIdxs, &i);
}
}
taosHashCleanup(ctx.colHash);
taosArrayDestroy(ctx.cInfoList);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
colDataDestroy(output.columnData);
taosMemoryFreeClear(output.columnData);
}
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(str, "zsl");
// if (pUidTagInfo->name != NULL) {
// STR_TO_VARSTR(str, pUidTagInfo->name);
// } else { // name is not retrieved during filter
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
// }
colDataSetVal(pColInfo, rowIndex, str, false);
} else {
STagVal tagVal = {0};
tagVal.cid = pExprInfo->base.pParam[0].pCol->colId;
if (pUidTagInfo->pTagVal == NULL) {
colDataSetNULL(pColInfo, rowIndex);
} else {
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pColInfo->info.type, &tagVal);
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
colDataSetNULL(pColInfo, rowIndex);
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
colDataSetVal(pColInfo, rowIndex, p, false);
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
varDataSetLen(tmp, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
colDataSetVal(pColInfo, rowIndex, tmp, false);
taosMemoryFree(tmp);
} else {
colDataSetVal(pColInfo, rowIndex, (const char*)&tagVal.i64, false);
}
}
}
}
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aUidTagIdxs,
SStorageAPI* pAPI) {
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
for (int i = 0; i < taosArrayGetSize(aUidTagIdxs); ++i) {
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, *(int32_t*)taosArrayGet(aUidTagIdxs, i));
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
return 0;
}
#if 0
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags,
SStorageAPI* pAPI) {
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
int32_t nTbls = taosArrayGetSize(aUidTags);
for (int i = 0; i < nTbls; ++i) {
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
// refactor later
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
char str[512];
STR_TO_VARSTR(str, "zsl");
colDataSetVal(pDst, (i), str, false);
} else { // it is a tag value
STagVal val = {0};
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pDst->info.type, &val);
char* data = NULL;
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
data = tTagValToData((const STagVal*)p, false);
} else {
data = (char*)p;
}
colDataSetVal(pDst, i, data,
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
data != NULL) {
taosMemoryFree(data);
}
}
}
}
return 0;
}
#endif
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t count = 0;
if (pInfo->pCtbCursor == NULL) {
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
}
SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
SArray* aUidTagIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
while (1) {
while (count < pOperator->resultInfo.capacity) {
SMCtbCursor* pCur = pInfo->pCtbCursor;
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
if (uid == 0) {
break;
}
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(aUidTags, &info);
}
int32_t numTables = taosArrayGetSize(aUidTags);
if (numTables != 0 && pInfo->pTagCond != NULL) {
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aUidTagIdxs, pAPI);
}
tagScanFillResultBlock(pOperator, pRes, aUidTags, aUidTagIdxs, pAPI);
if (taosArrayGetSize(aUidTagIdxs) != 0) {
break;
}
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
taosArrayClear(aUidTagIdxs);
}
taosArrayDestroy(aUidTagIdxs);
taosArrayDestroyEx(aUidTags, tagScanFreeUidTag);
pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -2753,7 +3018,7 @@ static void destroyTagScanOperatorInfo(void* param) { ...@@ -2753,7 +3018,7 @@ static void destroyTagScanOperatorInfo(void* param) {
} }
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -2774,7 +3039,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -2774,7 +3039,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->pTagCond = pTagCond;
pInfo->pTagIndexCond = pTagIndexCond;
pInfo->pTableListInfo = pTableListInfo; pInfo->pTableListInfo = pTableListInfo;
pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
...@@ -2789,6 +3055,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -2789,6 +3055,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL); createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
pInfo->suid = pPhyNode->suid;
return pOperator; return pOperator;
_error: _error:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册