提交 31dba259 编写于 作者: H Haojun Liao

[td-11818]fix bug in tsdbread.

上级 b6bd41d9
......@@ -85,7 +85,6 @@ enum {
typedef struct STableCheckInfo {
uint64_t tableId;
TSKEY lastKey;
STable* pTableObj;
SBlockInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
......@@ -141,8 +140,6 @@ typedef struct STsdbReadHandle {
STableBlockInfo* pDataBlockInfo;
SDataCols *pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
// STsdb
// STsdbMemTable * pMemTable;
SArray *defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
......@@ -204,8 +201,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
// the primary timestamp column does not be included in the the specified load column list, add it
if (loadTS && colId != 0) {
int16_t columnId = 0;
if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
taosArrayInsert(pLocalIdList, 0, &columnId);
}
......@@ -292,7 +289,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
for (int32_t j = 0; j < gsize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
......@@ -315,10 +312,9 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
size_t gsize = taosArrayGetSize(pTableCheckInfo);
for (int32_t i = 0; i < gsize; ++i) {
STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
taosArrayPush(pTable, &pInfo->pTableObj);
}
// for (int32_t i = 0; i < gsize; ++i) {
// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
// }
*psTable = pTable;
return pTableCheckInfo;
......@@ -347,15 +343,11 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
SArray* pTable = taosArrayInit(1, sizeof(STable*));
STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
STableCheckInfo info = { .lastKey = skey};
info.tableId = pCheckInfo->tableId;
taosArrayPush(pNew, &info);
taosArrayPush(pTable, &pCheckInfo->pTableObj);
*psTable = pTable;
return pNew;
}
......@@ -461,9 +453,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
}
// STsdbMeta* pMeta = NULL;//tsdbGetMeta(tsdb);
// assert(pMeta != NULL);
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
if (pReadHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
......@@ -641,12 +630,6 @@ SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, POINTER_BYTES);
for(int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
taosArrayPush(res, &pCheckInfo->pTableObj);
}
return res;
}
......@@ -1049,7 +1032,10 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
pCheckInfo->numOfBlocks = 0;
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
}
......@@ -1149,7 +1135,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
int64_t st = taosGetTimestampUs();
STSchema *pSchema = NULL;//tsdbGetTableSchema(pCheckInfo->pTableObj);
STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
......@@ -1184,7 +1170,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
pBlockLoadInfo->uid = pCheckInfo->pTableObj->uid;
pBlockLoadInfo->uid = pCheckInfo->tableId;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
......@@ -1878,7 +1864,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
STable* pTable = pCheckInfo->pTableObj;
STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
......@@ -1932,7 +1918,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2 = memRowVersion(row2);
}
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -1958,7 +1944,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -2745,7 +2731,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->pTableObj, NULL, NULL, true);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
tfree(pRow);
// update the last key value
......@@ -3389,14 +3375,14 @@ SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList)
if (pHandle->cur.mixBlock) {
return pHandle->pColumns;
} else {
SDataBlockInfo binfo = {0};/*GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);*/
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
pBlockLoadInfo->uid == pCheckInfo->pTableObj->tid) {
pBlockLoadInfo->uid == pCheckInfo->tableId) {
return pHandle->pColumns;
} else { // only load the file block
SBlock* pBlock = pBlockInfo->compBlock;
......
......@@ -551,7 +551,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
ASSERT(colIds[0] == 0);
ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
SBlockCol blockCol = {0};
......@@ -588,7 +588,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
if (pDataCol == NULL) continue;
ASSERT(pDataCol->colId == colId);
if (colId == 0) { // load the key row
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
blockCol.colId = colId;
blockCol.len = pBlock->keyLen;
blockCol.type = pDataCol->type;
......
......@@ -4927,6 +4927,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) { // todo handle malloc error
}
SEpSet epSet;
......@@ -7381,6 +7382,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
cond.twindow = pTableScanNode->window;
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
for(int32_t i = 0; i < cond.numOfCols; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册