提交 0c111cd2 编写于 作者: M Minglei Jin

fix: update schema to newest version to parsing rows

上级 06c05a8e
......@@ -80,15 +80,15 @@ typedef struct SFilesetIter {
typedef struct SFileDataBlockInfo {
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t uid;
int32_t tbBlockIdx;
int32_t tbBlockIdx;
} SFileDataBlockInfo;
typedef struct SDataBlockIter {
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SBlock block; // current SBlock data
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SBlock block; // current SBlock data
SHashObj* pTableMap;
} SDataBlockIter;
......@@ -124,8 +124,8 @@ struct STsdbReader {
SSDataBlock* pResBlock;
int32_t capacity;
SReaderStatus status;
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
......@@ -133,8 +133,8 @@ struct STsdbReader {
SDataFReader* pFileReader;
SVersionRange verRange;
int32_t step;
STsdbReader* innerReader[2];
int32_t step;
STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
......@@ -211,8 +211,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
pTsdbReader->idStr);
}
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0,
pTsdbReader->idStr);
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables,
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr);
return pTableMap;
}
......@@ -396,7 +396,8 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
return pResBlock;
}
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) {
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
const char* idstr) {
int32_t code = 0;
int8_t level = 0;
STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader));
......@@ -576,7 +577,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
int64_t et2 = taosGetTimestampUs();
tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
(int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr);
(int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr);
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
......@@ -591,7 +592,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
*numOfValidTables = 0;
int64_t st = taosGetTimestampUs();
size_t size = 0;
size_t size = 0;
STableBlockScanInfo* px = NULL;
while (1) {
......@@ -641,9 +642,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
}
}
double el = (taosGetTimestampUs() - st)/1000.0;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s",
numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr);
numOfTables, *numOfBlocks, *numOfValidTables, size / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += (*numOfBlocks);
pReader->cost.headFileLoadTime += el;
......@@ -679,9 +680,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
return pFBlockInfo;
}
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) {
return &pBlockIter->block;
}
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SReaderStatus* pStatus = &pReader->status;
......@@ -689,7 +688,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
......@@ -772,17 +771,17 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
int64_t st = taosGetTimestampUs();
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
int32_t code = tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols,
pBlockData, NULL, NULL);
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
int32_t code =
tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, NULL, NULL);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -857,7 +856,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
}
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
......@@ -882,7 +881,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
int64_t st = taosGetTimestampUs();
SBlockOrderSupporter sup = {0};
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -937,8 +936,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
int64_t et = taosGetTimestampUs();
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt,
(et - st)/1000.0, pReader->idStr);
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
pReader, cnt, (et - st) / 1000.0, pReader->idStr);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
cleanupBlockOrderSupporter(&sup);
......@@ -976,7 +975,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
int64_t et = taosGetTimestampUs();
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr);
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et - st) / 1000.0,
pReader->idStr);
cleanupBlockOrderSupporter(&sup);
taosMemoryFree(pTree);
......@@ -1024,7 +1024,7 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab
int32_t step = asc ? 1 : -1;
*nextIndex = pFBlockInfo->tbBlockIdx + step;
SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock));
SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock));
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
......@@ -1196,10 +1196,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
setComposedBlockFlag(pReader, true);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug(
"%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
" - %" PRId64 " %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr);
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64
" - %" PRId64 " %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pReader->idStr);
pReader->cost.buildmemBlock += elapsedTime;
return code;
......@@ -1499,9 +1499,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
setComposedBlockFlag(pReader, true);
int64_t et = taosGetTimestampUs();
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader,
pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows,
(et - st)/1000.0, pReader->idStr);
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -1679,7 +1680,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
SReaderStatus* pStatus = &pReader->status;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
while (1) {
......@@ -2212,7 +2213,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
CHECK_FILEBLOCK_STATE st;
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
......@@ -2228,7 +2229,7 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
if (pReader->pSchema == NULL) {
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
} else if (pReader->pSchema->version != sversion) {
} else if (pReader->pSchema->version < sversion) {
taosMemoryFreeClear(pReader->pSchema);
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema);
}
......@@ -2466,7 +2467,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
// update the SQueryTableDataCond to create inner reader
STimeWindow w = pCond->twindows;
int32_t order = pCond->order;
int32_t order = pCond->order;
if (order == TSDB_ORDER_ASC) {
pCond->twindows.ekey = pCond->twindows.skey;
pCond->twindows.skey = INT64_MIN;
......@@ -2485,7 +2486,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (order == TSDB_ORDER_ASC) {
pCond->twindows.skey = w.ekey;
pCond->twindows.ekey = INT64_MAX;
} else {
} else {
pCond->twindows.skey = INT64_MIN;
pCond->twindows.ekey = w.ekey;
}
......@@ -2533,10 +2534,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pPrevReader = pReader->innerReader[0];
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr);
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
pPrevReader->idStr);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
// no data in files, let's try buffer in memory
......@@ -2591,12 +2593,14 @@ void tsdbReaderClose(STsdbReader* pReader) {
SIOCostSummary* pCost = &pReader->cost;
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, "
"fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
"size:%.2f Kb %s",
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, "
"fileBlocks:%" PRId64
", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo "
"size:%.2f Kb %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime,
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock,
numOfTables * sizeof(STableBlockScanInfo) /1000.0, pReader->idStr);
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr);
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
......@@ -2675,9 +2679,9 @@ static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_MAIN) {
if (pReader->step == EXTERNAL_ROWS_MAIN) {
setBlockInfo(pReader, pDataBlockInfo);
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
setBlockInfo(pReader->innerReader[0], pDataBlockInfo);
} else {
setBlockInfo(pReader->innerReader[1], pDataBlockInfo);
......@@ -2691,7 +2695,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
int32_t code = 0;
*allHave = false;
if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
*pBlockStatis = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -2702,7 +2706,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
return TSDB_CODE_SUCCESS;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
int64_t stime = taosGetTimestampUs();
......@@ -2807,7 +2811,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
}
pReader->order = pCond->order;
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
pReader->status.loadFromFile = true;
pReader->status.pTableIter = NULL;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
......@@ -2826,7 +2830,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
resetDataBlockScanInfo(pReader->status.pTableMap);
int32_t code = 0;
int32_t code = 0;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
// no data in files, let's try buffer in memory
......@@ -2835,8 +2839,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s",
pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
return code;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册