From 3aab21c0fa24fd7a162c4a9843951c5c7426d07d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 19 Mar 2022 00:13:07 +0800 Subject: [PATCH] [td-13039] fix bug in show tables/stables. --- include/common/tdatablock.h | 13 ++- source/common/src/tdatablock.c | 50 +++++++++++ source/dnode/mnode/impl/src/mndDb.c | 4 +- source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executorimpl.c | 112 ++++++++++++++---------- 5 files changed, 131 insertions(+), 56 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c2249f408a..a9a056aab8 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -87,8 +87,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u : ((p1_)->pData + ((r_) * (p1_)->info.bytes))) int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, - uint32_t numOfRow2); +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); +int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); @@ -97,14 +97,13 @@ void colDataTrim(SColumnInfoData* pColumnInfoData); size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); -int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); -int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, - int32_t pageSize); -SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); - +int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); +int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); +SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); + size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4070224ab8..35c9e963e6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -239,6 +239,56 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co return numOfRow1 + numOfRow2; } +int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows) { + ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); + if (numOfRows == 0) { + return numOfRows; + } + + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + // Handle the bitmap + char* p = realloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * numOfRows); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->varmeta.offset = (int32_t*) p; + memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows); + + if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) { + char* tmp = realloc(pColumnInfoData->pData, pSource->varmeta.length); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->pData = tmp; + pColumnInfoData->varmeta.allocLen = pSource->varmeta.length; + } + + memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length); + pColumnInfoData->varmeta.length = pSource->varmeta.length; + } else { + char* tmp = realloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->nullbitmap = tmp; + memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows)); + + int32_t newSize = numOfRows * pColumnInfoData->info.bytes; + tmp = realloc(pColumnInfoData->pData, newSize); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->pData = tmp; + memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows); + } + + return 0; +} + size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { ASSERT(pBlock); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index ba37f67676..c6b93ad7b1 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1447,8 +1447,8 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_ STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); cols++; - pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); - *(int8_t *)pWrite = pDb->cfg.update; +// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); +// *(int8_t *)pWrite = pDb->cfg.update; } static void setInformationSchemaDbCfg(SDbObj* pDbObj) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 726e02fc4e..b7006ed8ad 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -440,8 +440,10 @@ typedef struct SSysTableScanInfo { tsem_t ready; SNode* pCondition; // db_name filter condition, to discard data that are not in current database - void *pCur; // cursor for iterate the local table meta store. - int32_t type; // show type, TODO remove it + void *pCur; // cursor for iterate the local table meta store. + SArray *scanCols; // SArray scan column id list + + int32_t type; // show type, TODO remove it SName name; SSDataBlock* pRes; int32_t capacity; @@ -628,7 +630,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4be896bebf..f3e04352fa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1221,9 +1221,9 @@ static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, in for (int32_t k = 0; k < numOfOutput; ++k) { if (pCtx[k].fpSet.init == NULL) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); - memcpy(pColInfoData->pData, pCtx[k].input.pData[0]->pData, colDataGetLength(pColInfoData, pCtx[k].input.numOfRows)); + colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows); } else { // TODO: arithmetic and other process. - + ASSERT(0); } } @@ -4937,16 +4937,36 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf return TSDB_CODE_SUCCESS; } +// TODO if only one or two columnss required, how to extract data? static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, - int32_t numOfOutput, int64_t startTs, uint64_t* total) { + int32_t numOfOutput, int64_t startTs, uint64_t* total, SArray* pColList) { blockDataEnsureCapacity(pRes, numOfRows); - for (int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + if (pColList == NULL) { + for (int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); + + for (int32_t j = 0; j < numOfRows; ++j) { + colDataAppend(pColInfoData, j, pData, false); + pData += pColInfoData->info.bytes; + } + } + } else { // extract data acording to pColList + ASSERT(numOfOutput == taosArrayGetSize(pColList)); + for(int32_t i = 0; i < numOfOutput; ++i) { - for(int32_t j = 0; j < numOfRows; ++j) { - colDataAppend(pColInfoData, j, pData, false); - pData += pColInfoData->info.bytes; + for(int32_t j = 0; j < numOfOutput; ++j) { + int16_t colIndex = *(int16_t*) taosArrayGet(pColList, j); + if (colIndex - 1 == i) { + SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, j); + + for (int32_t k = 0; k < numOfRows; ++k) { + colDataAppend(pColInfoData, k, pData, false); + pData += pColInfoData->info.bytes; + } + break; + } + } } } @@ -5016,7 +5036,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, - pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows); + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows, NULL); if (code != 0) { goto _error; } @@ -5118,7 +5138,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { SSDataBlock* pRes = pExchangeInfo->pResult; SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, - pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows); + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows, NULL); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 @@ -5460,12 +5480,17 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); } + blockDataClearup(pInfo->pRes, true); + SColumnInfoData* pTableNameCol = taosArrayGet(pInfo->pRes->pDataBlock, 0); char * name = NULL; int32_t numOfRows = 0; + + char n[TSDB_TABLE_NAME_LEN] = {0}; while ((name = metaTbCursorNext(pInfo->pCur)) != NULL) { - colDataAppend(pTableNameCol, numOfRows, name, false); + STR_TO_VARSTR(n, name); + colDataAppend(pTableNameCol, numOfRows, n, false); numOfRows += 1; if (numOfRows >= pInfo->capacity) { break; @@ -5518,7 +5543,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, - pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); return doFilterResult(pInfo); } @@ -5527,7 +5552,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { } SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SExecTaskInfo* pTaskInfo) { + SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo) { SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5540,6 +5565,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pInfo->pRes = pResBlock; pInfo->capacity = 4096; pInfo->pCondition = pCondition; + pInfo->scanCols = colList; // TODO remove it int32_t tableType = 0; @@ -5582,9 +5608,31 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB } else { tsem_init(&pInfo->ready, 0, 0); pInfo->epSet = epset; - } - pInfo->readHandle = pSysTableReadHandle; +#if 1 + { // todo refactor + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "DB-META"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = qProcessFetchRsp; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)"root"; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; + + pInfo->pTransporter = rpcOpen(&rpcInit); + if (pInfo->pTransporter == NULL) { + return NULL; // todo + } + } +#endif + } + pOperator->name = "SysTableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN; pOperator->blockingOptr = false; @@ -5595,29 +5643,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; -#if 1 - { // todo refactor - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = "DB-META"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = qProcessFetchRsp; - rpcInit.sessions = tsMaxConnections; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.user = (char *)"root"; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.ckey = "key"; - rpcInit.spi = 1; - rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; - - pInfo->pTransporter = rpcOpen(&rpcInit); - if (pInfo->pTransporter == NULL) { - return NULL; // todo - } - } -#endif - return pOperator; } @@ -6375,15 +6400,11 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { pProjectInfo->existDataBlock = pBlock; break; } else { // init output buffer for a new group data -// for (int32_t j = 0; j < pOperator->numOfOutput; ++j) { -// aAggs[pInfo->pCtx[j].functionId].xFinalize(&pInfo->pCtx[j]); -// } initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput); } } // todo dynamic set tags - // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // if (pTableQueryInfo != NULL) { // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); @@ -6400,7 +6421,6 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { } copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); -// resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } @@ -8165,7 +8185,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); - SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, &pSysScanPhyNode->scan.tableName, pSysScanPhyNode->scan.node.pConditions, pSysScanPhyNode->mgmtEpSet, pTaskInfo); + struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; + SArray* colList = extractScanColumnId(pScanNode->pScanCols); + + SOperatorInfo* pOperator = createSysTableScanOperatorInfo(pHandle->meta, pResBlock, &pScanNode->tableName, + pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo); return pOperator; } else { ASSERT(0); -- GitLab