diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 53dddd9c228621957cdca2563cba442e27b6ca51..ec204a8f60645cc05fe83c4486a8da0f627dac20 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -401,13 +401,12 @@ typedef struct SStreamBlockScanInfo { } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { - SReadHandle readHandle; - SRetrieveMetaTableRsp* pRsp; SRetrieveTableReq req; SEpSet epSet; tsem_t ready; + SReadHandle readHandle; int32_t accountId; bool showRewrite; SNode* pCondition; // db_name filter condition, to discard data that are not in current database diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c4b3274ae16332b30f3c45f22fa8b3931fa8879e..4ed6a2de0d7c9c8d6f8ce7ba4353a3eb2aab0d6f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -306,17 +306,21 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) const char* p = NULL; if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { const uint8_t* tmp = mr.me.ctbEntry.pTags; - char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); + + char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); if (data == NULL) { + metaReaderClear(&mr); qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1); return; } + *data = TSDB_DATA_TYPE_JSON; memcpy(data + 1, tmp, kvRowLen(tmp)); p = data; } else { p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); } + for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, p, (p == NULL)); } @@ -1216,18 +1220,18 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // retrieve local table list info from vnode const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + // the retrieve is executed on the mnode, so return tables that belongs to the information schema database. if (pInfo->readHandle.mnd != NULL) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); doFilterResult(pInfo); pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; - pOperator->status = OP_EXEC_DONE; + doSetOperatorCompleted(pOperator); return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } else { if (pInfo->pCur == NULL) { @@ -1253,7 +1257,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - while (metaTbCursorNext(pInfo->pCur) == 0) { + + int32_t ret = 0; + while ((ret = metaTbCursorNext(pInfo->pCur)) == 0) { STR_TO_VARSTR(n, pInfo->pCur->mr.me.name); // table name @@ -1336,6 +1342,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } } + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found + if (ret != 0) { + metaCloseTbCursor(pInfo->pCur); + pInfo->pCur = NULL; + doSetOperatorCompleted(pOperator); + } + p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;