diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5191331370b91277a7af1eeeafaec1b3d1d320af..9c711cf8a3ee8b76907a8dc36499043c5c74b9db 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -92,8 +92,8 @@ typedef struct SIOCostSummary { typedef struct SBlockLoadSuppInfo { SColumnDataAgg* pstatis; SColumnDataAgg** plist; - int16_t* colIdList; // default load column int32_t* slotIds; // colId to slotId + char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. } SBlockLoadSuppInfo; typedef struct SFileSetIter { @@ -189,16 +189,34 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed); // pCompBlockLoadInfo->fileId = -1; // } -static int32_t setColumnIdList(STsdbReader* pReader, SSDataBlock* pBlock) { +static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - pReader->suppInfo.colIdList = taosMemoryCalloc(numOfCols, sizeof(int16_t)); - if (pReader->suppInfo.colIdList == NULL) { + pSupInfo->slotIds = taosMemoryCalloc(numOfCols, sizeof(int16_t)); + if (pSupInfo->slotIds == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES); + if (pSupInfo->buildBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + STSchema* pSchema = pReader->pSchema; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); - pReader->suppInfo.colIdList[i] = pCol->info.colId; + + for (int32_t j = 0; j < pSchema->numOfCols; ++j) { + if (pCol->info.colId == pSchema->columns[j].colId) { + pSupInfo->slotIds[i] = j; + break; + } + } + + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes); + } } return TSDB_CODE_SUCCESS; @@ -426,8 +444,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity); - setColumnIdList(pReader, pReader->pResBlock); - pReader->suppInfo.slotIds = taosMemoryCalloc(pCond->numOfCols, sizeof(int32_t)); + setColumnIdSlotList(pReader, pReader->pResBlock); pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES); } @@ -2504,7 +2521,6 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { return code; } - // todo extract method: getCurrentKeyInBuf() SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); @@ -2522,7 +2538,10 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) { buildInmemDataBlock(pReader, pScanInfo, &maxKey); // build data block from in-memory buffer data completed. } else { // whole block is required, return it directly - // todo check the data version + // todo + // 1. the version of all rows should be less than the endVersion + // 2. current block should not overlap with next neighbor block + // 3. current timestamp should not be overlap with each other SDataBlockInfo* pInfo = &pReader->pResBlock->info; pInfo->rows = pBlock->nRow; pInfo->uid = pScanInfo->uid; @@ -2740,12 +2759,30 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow int32_t numOfRows = pBlock->info.rows; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + SColVal colVal = {0}; for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + int32_t slotId = pSupInfo->slotIds[i]; - tTSRowGetVal(pTSRow, pReader->pSchema, pColInfoData->info.colId, &colVal); - colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull); + if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && slotId == 0) { + colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false); + } else { + tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal); + + if (IS_VAR_DATA_TYPE(colVal.type)) { + if (colVal.isNull) { + colDataAppendNULL(pColInfoData, numOfRows); + } else { + varDataSetLen(pSupInfo->buildBuf[i], colVal.value.nData); + memcpy(varDataVal(pSupInfo->buildBuf[i]), colVal.value.pData, colVal.value.nData); + colDataAppend(pColInfoData, numOfRows, pSupInfo->buildBuf[i], false); + } + } else { + colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull); + } + } } pBlock->info.rows += 1; @@ -3135,7 +3172,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } // current table is exhausted, let's try the next table - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, &pStatus->pTableIter); + pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); if (pStatus->pTableIter == NULL) { return false; }