提交 0758751b 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 0e676070
......@@ -119,8 +119,6 @@ typedef struct SQueryTableDataCond {
STimeWindow* twindows;
int64_t startVersion;
int64_t endVersion;
int32_t numOfTables; // number of tables
uint64_t* uidList; // table uid list
} SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock);
......
......@@ -45,10 +45,6 @@ typedef struct STableBlockScanInfo {
TSKEY lastKey;
SBlockIdx blockIdx;
SArray* pBlockList; // block data index list
// SBlockInfo* pCompInfo;
// int32_t compSize;
// int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t chosen : 2; // indicate which iterator should move forward
bool iterInit; // whether to initialize the in-memory skip list iterator or not
STbDataIter* iter; // mem buffer skip list iterator
STbDataIter* iiter; // imem buffer skip list iterator
......@@ -79,6 +75,7 @@ typedef struct SIOCostSummary {
typedef struct SBlockLoadSuppInfo {
SColumnDataAgg* pstatis;
SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data
int32_t* slotIds; // colId to slotId
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo;
......@@ -133,11 +130,11 @@ struct STsdbReader {
SSDataBlock* pResBlock;
int32_t capacity;
SReaderStatus status;
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SIOCostSummary cost;
STSchema* pSchema;
......@@ -180,27 +177,31 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
pSupInfo->slotIds = taosMemoryCalloc(numOfCols, sizeof(int16_t));
if (pSupInfo->slotIds == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t numOfCols = blockDataGetNumOfCols(pBlock);
// pSupInfo->slotIds = taosMemoryCalloc(numOfCols, sizeof(int16_t));
// if (pSupInfo->slotIds == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
pSupInfo->colIds = taosMemoryMalloc(numOfCols * sizeof(int16_t));
pSupInfo->buildBuf = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pSupInfo->buildBuf == NULL) {
if (pSupInfo->buildBuf == NULL || pSupInfo->colIds == NULL) {
taosMemoryFree(pSupInfo->colIds);
taosMemoryFree(pSupInfo->buildBuf);
return TSDB_CODE_OUT_OF_MEMORY;
}
STSchema* pSchema = pReader->pSchema;
// STSchema* pSchema = pReader->pSchema;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
pSupInfo->colIds[i] = pCol->info.colId;
for (int32_t j = 0; j < pSchema->numOfCols; ++j) {
if (pCol->info.colId == pSchema->columns[j].colId) {
pSupInfo->slotIds[i] = j;
break;
}
}
// for (int32_t j = 0; j < pSchema->numOfCols; ++j) {
// if (pCol->info.colId == pSchema->columns[j].colId) {
// pSupInfo->slotIds[i] = j;
// break;
// }
// }
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
......@@ -402,7 +403,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->type = pCond->type;
pReader->window = *pCond->twindows;
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
if (pReader->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
ASSERT(pReader->pSchema);
}
// todo remove this
setQueryTimewindow(pReader, pCond, 0);
......@@ -653,48 +657,6 @@ _end:
// return hasNext;
// }
// static bool hasMoreDataInCache(STsdbReader* pHandle) {
// STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
// size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
// assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
// pHandle->cur.fid = INT32_MIN;
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
// if (!pCheckInfo->initBuf) {
// initTableMemIterator(pHandle, pCheckInfo);
// }
// STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL, TD_VER_MAX);
// if (row == NULL) {
// return false;
// }
// pCheckInfo->lastKey = TD_ROW_KEY(row); // first timestamp in buffer
// tsdbDebug("%p uid:%" PRId64 ", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle,
// pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr);
// // all data in mem are checked already.
// if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) ||
// (pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_TRAVERSE(pHandle->order))) {
// return false;
// }
// int32_t step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
// STimeWindow* win = &pHandle->cur.win;
// pHandle->cur.rows = buildInmemDataBlockImpl(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
// // update the last key value
// pCheckInfo->lastKey = win->ekey + step;
// pHandle->cur.lastKey = win->ekey + step;
// pHandle->cur.mixBlock = true;
// if (!ASCENDING_TRAVERSE(pHandle->order)) {
// TSWAP(win->skey, win->ekey);
// }
// return true;
// }
// static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precision) {
// assert(precision >= TSDB_TIME_PRECISION_MICRO || precision <= TSDB_TIME_PRECISION_NANO);
// if (key == TSKEY_INITIAL_VAL) {
......@@ -829,9 +791,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
}
// 2. version range check
// if (block.minVersion > pReader->startVersion || block.maxVersion < pReader->endVersion) {
// continue;
// }
if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
continue;
}
void* p = taosArrayPush(pScanInfo->pBlockList, &block);
if (p == NULL) {
......@@ -877,16 +839,16 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1);
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SColVal cv = {0};
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -2126,22 +2088,6 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
// return getDataBlock(pTsdbReadHandle, pBlockInfo, exists);
// }
// static bool isEndFileDataBlock(SQueryFilePos* cur, int32_t numOfBlocks, bool ascTrav) {
// assert(cur != NULL && numOfBlocks > 0);
// return (cur->slot == numOfBlocks - 1 && ascTrav) || (cur->slot == 0 && !ascTrav);
// }
// static void moveToNextDataBlockInCurrentFile(STsdbReader* pTsdbReadHandle) {
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// assert(cur->slot < pTsdbReadHandle->numOfBlocks && cur->slot >= 0);
// cur->slot += step;
// cur->mixBlock = false;
// cur->blockCompleted = false;
// }
// static int32_t getBucketIndex(int32_t startRow, int32_t bucketRange, int32_t numOfRows) {
// return (numOfRows - startRow) / bucketRange;
// }
......@@ -2184,26 +2130,22 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBloc
}
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
if (pBlockScanInfo->iter != NULL) {
pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter);
} else {
pBlockScanInfo->memHasVal = false;
}
if (pBlockScanInfo->iiter != NULL) {
pBlockScanInfo->imemHasVal = tsdbTbDataIterNext(pBlockScanInfo->iiter);
} else {
pBlockScanInfo->imemHasVal = false;
}
if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pBlock = pReader->pResBlock;
int64_t st = taosGetTimestampUs();
int32_t code = buildInmemDataBlockImpl(pBlockScanInfo, *key, pReader->capacity, pReader);
setComposedBlockFlag(pReader, true);
// set the correct block data info
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pReader->idStr);
pBlock->info.uid = pBlockScanInfo->uid;
setComposedBlockFlag(pReader, true);
return code;
}
......@@ -2384,15 +2326,17 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code;
} else {
if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->memHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, d->minKey, d->maxKey,
pReader->idStr);
} else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code;
}
}
} else {
......@@ -2404,15 +2348,17 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code;
} else {
if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->imemHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, di->minKey, di->maxKey,
pReader->idStr);
} else {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code;
}
}
} else {
......@@ -2558,6 +2504,24 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
}
static int32_t initForFirstBlockOfFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// all data files are consumed, try data in buffer
if (numOfBlocks == 0) {
pReader->status.loadFromFile = false;
return code;
}
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
return code;
}
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -2565,71 +2529,42 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SFileSetIter* pFIter = &pStatus->fileIter;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
ASSERT (pFIter->index < pFIter->numOfFiles);
if (pFIter->index < pFIter->numOfFiles) {
if (pReader->status.blockIter.index == -1) {
int32_t numOfBlocks = 0;
code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
if (pReader->status.blockIter.index == -1) {
code = initForFirstBlockOfFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
// current block are exhausted, try the next file block
if (pDumpInfo->rowIndex >= pBlock->nRow) {
// try next data block in current file
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (!hasNext) { // current file is exhausted, let's try the next file
code = initForFirstBlockOfFile(pReader, pBlockIter);
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
}
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
// current block are exhausted, try the next file block
if (pDumpInfo->rowIndex >= pBlock->nRow) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (!hasNext) { // current file is exhausted, let's try the next file
int32_t numOfBlocks = 0;
code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// all data files are consumed, try data in buffer
if (numOfBlocks == 0) {
pReader->status.loadFromFile = false;
return code;
} else {
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // try next data block in current file
blockIteratorNext(pBlockIter);
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else {
code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
return code;
}
// repeat the previous procedure.
code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
return code;
}
}
......@@ -2759,6 +2694,17 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
return TSDB_CODE_SUCCESS;
}
static void checkUpdateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
int32_t sversion = TSDBROW_SVERSION(pRow);
if (pReader->pSchema == NULL) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
} else if (pReader->pSchema->version != sversion) {
taosMemoryFreeClear(pReader->pSchema);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, sversion);
}
}
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) {
TSKEY mergeTs = TSKEY_INITIAL_VAL;
......@@ -2767,7 +2713,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBKEY k = {.ts = TSKEY_INITIAL_VAL};
TSDBKEY k = {.ts = TSKEY_INITIAL_VAL};
TSDBKEY ik = {.ts = TSKEY_INITIAL_VAL};
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) {
......@@ -2775,6 +2721,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
ik = TSDBROW_KEY(piRow);
if (ik.ts <= k.ts) {
checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
......@@ -2785,6 +2733,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
tRowMergerGetRow(&merge, pTSRow);
return TSDB_CODE_SUCCESS;
} else { // k.ts < ik.ts
checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
......@@ -2792,8 +2742,11 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
}
}
if (pBlockScanInfo->memHasVal) {
k = TSDBROW_KEY(pRow);
checkUpdateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
......@@ -2802,6 +2755,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
if (pBlockScanInfo->imemHasVal) {
ik = TSDBROW_KEY(piRow);
checkUpdateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
......@@ -2816,17 +2771,31 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
STSchema *pSchema = pReader->pSchema;
SColVal colVal = {0};
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int32_t slotId = pSupInfo->slotIds[i];
int32_t i = 0, j = 0;
if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && slotId == 0) {
colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false);
} else {
tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal);
doCopyColVal(pColInfoData, i, numOfRows, &colVal, pSupInfo);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (pColInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pTSRow->ts, false);
i += 1;
}
while (i < numOfCols && j < pSchema->numOfCols) {
pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
col_id_t colId = pColInfoData->info.colId;
if (colId == pSchema->columns[j].colId) {
tTSRowGetVal(pTSRow, pReader->pSchema, j, &colVal);
doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
i += 1;
j += 1;
} else if (colId < pSchema->columns[j].colId) {
colDataAppendNULL(pColInfoData, numOfRows);
i += 1;
} else if (colId > pSchema->columns[j].colId) {
j += 1;
}
}
......@@ -2837,7 +2806,6 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock;
int64_t st = taosGetTimestampUs();
do {
STSRow* pTSRow = NULL;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow);
......@@ -2870,14 +2838,6 @@ int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY max
} while (1);
ASSERT(pBlock->info.rows <= capacity);
pBlock->info.uid = pBlockScanInfo->uid;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
pReader, elapsedTime, pBlock->info.rows, numOfCols, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -2976,46 +2936,6 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
// return false;
// }
// static bool loadCachedLastRow(STsdbReader* pTsdbReadHandle) {
// // the last row is cached in buffer, return it directly.
// // here note that the pTsdbReadHandle->window must be the TS_INITIALIZER
// int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// assert(numOfTables > 0 && numOfCols > 0);
// SQueryFilePos* cur = &pTsdbReadHandle->cur;
// STSRow* pRow = NULL;
// TSKEY key = TSKEY_INITIAL_VAL;
// int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
// TSKEY lastRowKey = TSKEY_INITIAL_VAL;
// int32_t curRow = 0;
// if (++pTsdbReadHandle->activeIndex < numOfTables) {
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, pTsdbReadHandle->activeIndex);
// // int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
// // if (ret != TSDB_CODE_SUCCESS) {
// // return false;
// // }
// mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
// pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
// taosMemoryFreeClear(pRow);
// // update the last key value
// pCheckInfo->lastKey = key + step;
// cur->rows = 1; // only one row
// cur->lastKey = key + step;
// cur->mixBlock = true;
// cur->win.skey = key;
// cur->win.ekey = key;
// return true;
// }
// return false;
// }
// static bool loadDataBlockFromTableSeq(STsdbReader* pTsdbReadHandle) {
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// assert(numOfTables > 0);
......@@ -3117,7 +3037,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
return TSDB_CODE_SUCCESS;
}
pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, taosArrayGetSize(pTableList));
int32_t numOfTables = taosArrayGetSize(pTableList);
pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pData, numOfTables);
if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader);
*ppReader = NULL;
......@@ -3154,7 +3075,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
// }
#endif
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, pCond->numOfTables, pReader->idStr);
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
_err:
......@@ -3529,26 +3450,33 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
}
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
int64_t rows = 0;
SMemTable* pMemTable = NULL; // pTsdbReadHandle->pMemTable;
int64_t rows = 0;
// if (pMemTable == NULL) {
// return rows;
// }
SReaderStatus* pStatus = &pReader->status;
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
// size_t size = taosArrayGetSize(pReader->pTableCheckInfo);
// for (int32_t i = 0; i < size; ++i) {
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, i);
// // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// // pMem = pMemT->tData[pCheckInfo->tableId];
// // rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// // }
// // if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// // pIMem = pIMemT->tData[pCheckInfo->tableId];
// // rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// // }
// }
while (pStatus->pTableIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
if (d != NULL) {
rows += tsdbGetNRowsInTbData(d);
}
}
STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
if (di != NULL) {
rows += tsdbGetNRowsInTbData(di);
}
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
}
return rows;
}
......@@ -849,7 +849,7 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
uint8_t *pBuf1 = NULL;
uint8_t *pBuf2 = NULL;
ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID);
ASSERT(aColId[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
if (!ppBuf1) ppBuf1 = &pBuf1;
if (!ppBuf2) ppBuf2 = &pBuf2;
......
run tsim/user/pass_alter.sim
run tsim/user/basic1.sim
run tsim/user/privilege2.sim
run tsim/user/user_len.sim
run tsim/user/privilege1.sim
run tsim/user/pass_len.sim
run tsim/user/password.sim
run tsim/user/privilege_db.sim
run tsim/user/privilege_sysinfo.sim
run tsim/user/basic.sim
run tsim/table/basic1.sim
run tsim/trans/lossdata1.sim
run tsim/trans/create_db.sim
......@@ -26,18 +24,23 @@ run tsim/stable/values.sim
run tsim/stable/dnode3.sim
run tsim/stable/alter_insert1.sim
run tsim/stable/refcount.sim
run tsim/stable/tag_filter.sim
run tsim/stable/disk.sim
run tsim/db/basic1.sim
run tsim/db/basic3.sim
run tsim/db/basic7.sim
run tsim/db/basic6.sim
run tsim/db/alter_replica_13.sim
run tsim/db/create_all_options.sim
run tsim/db/basic2.sim
run tsim/db/error1.sim
run tsim/db/alter_replica_31.sim
run tsim/db/taosdlog.sim
run tsim/db/alter_option.sim
run tsim/mnode/basic1.sim
#run tsim/mnode/basic3.sim
run tsim/mnode/basic4.sim
run tsim/mnode/basic3.sim
run tsim/mnode/basic5.sim
run tsim/mnode/basic2.sim
run tsim/parser/fourArithmetic-basic.sim
run tsim/parser/groupby-basic.sim
......@@ -57,11 +60,12 @@ run tsim/query/complex_group.sim
run tsim/query/interval.sim
run tsim/query/session.sim
run tsim/query/scalarFunction.sim
#run tsim/query/scalarNull.sim
run tsim/query/scalarNull.sim
run tsim/query/complex_where.sim
run tsim/tmq/basic1.sim
run tsim/tmq/basic4.sim
run tsim/tmq/basic1Of2Cons.sim
run tsim/tmq/snapshot.sim
run tsim/tmq/prepareBasicEnv-1vgrp.sim
run tsim/tmq/topic.sim
run tsim/tmq/basic4Of2Cons.sim
......@@ -73,14 +77,36 @@ run tsim/tmq/basic3Of2Cons.sim
run tsim/tmq/basic2Of2ConsOverlap.sim
run tsim/tmq/clearConsume.sim
run tsim/qnode/basic1.sim
run tsim/dnode/basic1.sim
run tsim/dnode/redistribute_vgroup_replica3_v3.sim
run tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
run tsim/dnode/redistribute_vgroup_replica3_v2.sim
run tsim/dnode/drop_dnode_has_mnode.sim
run tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
run tsim/dnode/drop_dnode_has_vnode_replica1.sim
run tsim/dnode/balance_replica3.sim
run tsim/dnode/redistribute_vgroup_replica1.sim
run tsim/dnode/drop_dnode_has_vnode_replica3.sim
run tsim/dnode/balance_replica1.sim
run tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
run tsim/dnode/drop_dnode_has_qnode_snode.sim
run tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
run tsim/dnode/create_dnode.sim
run tsim/testsuit.sim
run tsim/show/basic.sim
run tsim/stream/basic1.sim
run tsim/stream/windowClose.sim
run tsim/stream/partitionby1.sim
run tsim/stream/triggerInterval0.sim
run tsim/stream/triggerSession0.sim
run tsim/stream/distributeIntervalRetrive0.sim
run tsim/stream/basic0.sim
run tsim/stream/session0.sim
run tsim/stream/schedSnode.sim
run tsim/stream/partitionby.sim
run tsim/stream/session1.sim
run tsim/stream/distributeInterval0.sim
run tsim/stream/distributeSession0.sim
run tsim/stream/state0.sim
run tsim/stream/basic2.sim
run tsim/insert/basic1.sim
run tsim/insert/commit-merge0.sim
......@@ -88,15 +114,18 @@ run tsim/insert/basic0.sim
run tsim/insert/update0.sim
run tsim/insert/backquote.sim
run tsim/insert/null.sim
run tsim/catalog/alterInCurrent.sim
run tsim/sync/oneReplica1VgElectWithInsert.sim
run tsim/sync/threeReplica1VgElect.sim
run tsim/sync/oneReplica1VgElect.sim
run tsim/sync/3Replica5VgElect.sim
run tsim/sync/3Replica5VgElect3mnodedrop.sim
run tsim/sync/3Replica5VgElect3mnode.sim
run tsim/sync/insertDataByRunBack.sim
run tsim/sync/oneReplica5VgElect.sim
run tsim/sync/3Replica1VgElect.sim
run tsim/sync/threeReplica1VgElectWihtInsert.sim
run tsim/sma/tsmaCreateInsertData.sim
run tsim/sma/tsmaCreateInsertQuery.sim
run tsim/sma/rsmaCreateInsertQuery.sim
run tsim/valgrind/basic.sim
run tsim/valgrind/checkError.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册