提交 05d0a0fe 编写于 作者: S Shengliang Guan

Merge branch 'enh/TD-20043' of https://github.com/taosdata/TDengine into enh/TD-20043

......@@ -155,15 +155,13 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs);
int32_t tBlockDataCreate(SBlockData *pBlockData);
void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear);
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid);
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData);
int32_t tBlockDataAddColData(SBlockData *pBlockData, SColData **ppColData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufN[]);
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
......@@ -473,7 +471,7 @@ struct SBlockData {
int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0)
int64_t *aVersion; // versions of each row
TSKEY *aTSKEY; // timestamp of each row
SArray *aIdx; // SArray<int32_t>
int32_t nColData;
SArray *aColData; // SArray<SColData>
};
......@@ -716,14 +714,14 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// tsdbCache ==============================================================================================
typedef struct SCacheRowsReader {
SVnode *pVnode;
STSchema *pSchema;
uint64_t uid;
uint64_t suid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
SVnode *pVnode;
STSchema *pSchema;
uint64_t uid;
uint64_t suid;
char **transferBuf; // todo remove it soon
int32_t numOfCols;
int32_t type;
int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
......
......@@ -135,22 +135,22 @@ typedef struct SUidOrderCheckInfo {
} SUidOrderCheckInfo;
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileset; // current opened file set
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileset; // current opened file set
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
} SReaderStatus;
typedef struct SBlockInfoBuf {
int32_t currentIndex;
SArray* pData;
int32_t numPerBucket;
int32_t currentIndex;
SArray* pData;
int32_t numPerBucket;
} SBlockInfoBuf;
struct STsdbReader {
......@@ -185,11 +185,13 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl
SRowMerger* pMerger, SVersionRange* pVerRange);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pInfo);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
STableBlockScanInfo* pInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
......@@ -238,13 +240,13 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
}
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
int32_t num = numOfTables / pBuf->numPerBucket;
int32_t num = numOfTables / pBuf->numPerBucket;
int32_t remainder = numOfTables % pBuf->numPerBucket;
if (pBuf->pData == NULL) {
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
}
for(int32_t i = 0; i < num; ++i) {
for (int32_t i = 0; i < num; ++i) {
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -266,7 +268,7 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
size_t num = taosArrayGetSize(pBuf->pData);
for(int32_t i = 0; i < num; ++i) {
for (int32_t i = 0; i < num; ++i) {
char** p = taosArrayGet(pBuf->pData, i);
taosMemoryFree(*p);
}
......@@ -276,7 +278,7 @@ static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
int32_t bucketIndex = index / pBuf->numPerBucket;
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}
......@@ -319,8 +321,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
#endif
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
pTsdbReader->idStr);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
}
pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
......@@ -334,7 +336,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
pInfo->iiter.hasVal = false;
......@@ -571,7 +573,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
ASSERT(pCond->numOfCols > 0);
limitOutputBufferSize(pCond, &pReader->capacity);
......@@ -708,7 +710,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset};
bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts};
bIndex.window = (STimeWindow){.skey = block.minKey.ts, .ekey = block.maxKey.ts};
void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex);
if (p == NULL) {
......@@ -969,7 +971,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
}
int32_t colIndex = 0;
int32_t num = taosArrayGetSize(pBlockData->aIdx);
int32_t num = pBlockData->nColData;
while (i < numOfOutputCols && colIndex < num) {
rowIndex = 0;
pColData = taosArrayGet(pResBlock->pDataBlock, i);
......@@ -1036,7 +1038,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
......@@ -1063,7 +1065,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
", rows:%d, code:%s %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
tstrerror(code), pReader->idStr);
return code;
......@@ -1072,7 +1074,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
......@@ -1300,8 +1302,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
int32_t step = asc ? 1 : -1;
*nextIndex = pBlockInfo->tbBlockIdx + step;
*pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
*pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
return true;
}
......@@ -1365,7 +1367,8 @@ static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pV
(pBlock->minVer <= pVerRange->maxVer);
}
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t startIndex) {
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
int32_t startIndex) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = startIndex; i < num; i += 1) {
......@@ -1514,7 +1517,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
" - %" PRId64 " %s",
" - %" PRId64 " %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pReader->idStr);
......@@ -2313,7 +2316,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
goto _end;
}
pBlockScanInfo = *(STableBlockScanInfo**) p;
pBlockScanInfo = *(STableBlockScanInfo**)p;
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
......@@ -2324,7 +2327,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
// record the last key value
pBlockScanInfo->lastKey = asc? pBlock->maxKey.ts:pBlock->minKey.ts;
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
goto _end;
}
}
......@@ -2387,7 +2390,7 @@ _end:
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -2553,7 +2556,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
void* p = taosHashIterate(pStatus->pTableMap, NULL);
while (p != NULL) {
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
p = taosHashIterate(pStatus->pTableMap, p);
}
......@@ -2627,7 +2630,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
while (1) {
// load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasVal) {
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
......@@ -2665,7 +2668,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pBlockInfo != NULL) {
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
pScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
} else {
pScanInfo = *pReader->status.pTableIter;
}
......@@ -2717,7 +2721,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pInfo->window.ekey:pInfo->window.skey;
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
}
}
......@@ -2897,8 +2901,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
int8_t precision = pVnode->config.tsdbCfg.precision;
int64_t now = taosGetTimestamp(precision);
int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI) ? 1L
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
: 1000000L);
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
: 1000000L);
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + level;
......@@ -3414,7 +3418,8 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS;
}
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, STableBlockScanInfo* pScanInfo) {
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
STableBlockScanInfo* pScanInfo) {
int32_t numOfRows = pBlock->info.rows;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
int64_t uid = pScanInfo->uid;
......@@ -3474,7 +3479,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
}
SColVal cv = {0};
int32_t numOfInputCols = pBlockData->aIdx->size;
int32_t numOfInputCols = pBlockData->nColData;
int32_t numOfOutputCols = pResBlock->pDataBlock->size;
while (i < numOfOutputCols && j < numOfInputCols) {
......@@ -3555,8 +3560,8 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
taosHashClear(pReader->status.pTableMap);
STableKeyInfo* pList = (STableKeyInfo*) pTableList;
for(int32_t i = 0; i < num; ++i) {
STableKeyInfo* pList = (STableKeyInfo*)pTableList;
for (int32_t i = 0; i < num; ++i) {
STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
pInfo->uid = pList[i].uid;
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
......@@ -3714,7 +3719,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
_err:
_err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
return code;
}
......@@ -3882,7 +3887,8 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
STableBlockScanInfo* pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
return false;
}
......@@ -3929,7 +3935,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
// int64_t stime = taosGetTimestampUs();
// int64_t stime = taosGetTimestampUs();
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
......@@ -3960,7 +3966,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
int32_t i = 0, j = 0;
size_t size = taosArrayGetSize(pSup->pColAgg);
size_t size = taosArrayGetSize(pSup->pColAgg);
while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
......@@ -3995,7 +4001,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
}
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
STableBlockScanInfo* pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pBlockScanInfo == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
......@@ -4069,7 +4076,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
}
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
" in query %s",
" in query %s",
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
pReader->idStr);
......@@ -4260,7 +4267,7 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr
}
tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
_exit:
_exit:
return code;
}
......
......@@ -514,7 +514,7 @@ static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData,
pSmaInfo->size = 0;
// encode
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
if ((!pColData->smaOn) || IS_VAR_DATA_TYPE(pColData->type)) continue;
......@@ -1112,7 +1112,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey);
// read and decode columns
if (taosArrayGetSize(pBlockData->aIdx) == 0) goto _exit;
if (pBlockData->nColData == 0) goto _exit;
if (hdr.szBlkCol > 0) {
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
......@@ -1128,7 +1128,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo
SBlockCol *pBlockCol = &blockCol;
int32_t n = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
while (pBlockCol && pBlockCol->cid < pColData->cid) {
......@@ -1212,49 +1212,6 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData
ASSERT(pDataBlk->nSubBlock == 1);
#if 0
if (pDataBlk->nSubBlock > 1) {
SBlockData bData1;
SBlockData bData2;
// create
code = tBlockDataCreate(&bData1);
if (code) goto _err;
code = tBlockDataCreate(&bData2);
if (code) goto _err;
// init
tBlockDataInitEx(&bData1, pBlockData);
tBlockDataInitEx(&bData2, pBlockData);
for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], &bData1);
if (code) {
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
goto _err;
}
code = tBlockDataCopy(pBlockData, &bData2);
if (code) {
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
goto _err;
}
code = tBlockDataMerge(&bData1, &bData2, pBlockData);
if (code) {
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
goto _err;
}
}
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
}
#endif
return code;
_err:
......
......@@ -613,7 +613,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
return &pIter->colVal;
}
} else {
if (pIter->i < taosArrayGetSize(pIter->pRow->pBlockData->aIdx)) {
if (pIter->i < pIter->pRow->pBlockData->nColData) {
SColData *pColData = tBlockDataGetColDataByIdx(pIter->pRow->pBlockData, pIter->i);
tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal);
......@@ -917,14 +917,9 @@ int32_t tBlockDataCreate(SBlockData *pBlockData) {
pBlockData->aUid = NULL;
pBlockData->aVersion = NULL;
pBlockData->aTSKEY = NULL;
pBlockData->aIdx = taosArrayInit(0, sizeof(int32_t));
if (pBlockData->aIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pBlockData->nColData = 0;
pBlockData->aColData = taosArrayInit(0, sizeof(SColData));
if (pBlockData->aColData == NULL) {
taosArrayDestroy(pBlockData->aIdx);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
......@@ -937,12 +932,10 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) {
tFree((uint8_t *)pBlockData->aUid);
tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, deepClear ? tColDataDestroy : NULL);
pBlockData->aUid = NULL;
pBlockData->aVersion = NULL;
pBlockData->aTSKEY = NULL;
pBlockData->aIdx = NULL;
pBlockData->aColData = NULL;
}
......@@ -955,7 +948,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema,
pBlockData->uid = pId->uid;
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
pBlockData->nColData = 0;
if (aCid) {
int32_t iColumn = 1;
STColumn *pTColumn = &pTSchema->columns[iColumn];
......@@ -969,7 +962,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema,
break;
} else if (pTColumn->colId == aCid[iCid]) {
SColData *pColData;
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
......@@ -982,7 +975,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema,
STColumn *pTColumn = &pTSchema->columns[iColumn];
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData);
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0);
......@@ -993,64 +986,36 @@ _exit:
return code;
}
int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom) {
int32_t code = 0;
ASSERT(pBlockDataFrom->suid || pBlockDataFrom->uid);
pBlockData->suid = pBlockDataFrom->suid;
pBlockData->uid = pBlockDataFrom->uid;
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockDataFrom->aIdx); iColData++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColData);
SColData *pColData;
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
}
_exit:
return code;
}
void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->suid = 0;
pBlockData->uid = 0;
pBlockData->nRow = 0;
taosArrayClear(pBlockData->aIdx);
pBlockData->nColData = 0;
}
void tBlockDataClear(SBlockData *pBlockData) {
ASSERT(pBlockData->suid || pBlockData->uid);
pBlockData->nRow = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
tColDataClear(pColData);
}
}
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) {
int32_t tBlockDataAddColData(SBlockData *pBlockData, SColData **ppColData) {
int32_t code = 0;
SColData *pColData = NULL;
int32_t idx = taosArrayGetSize(pBlockData->aIdx);
if (idx >= taosArrayGetSize(pBlockData->aColData)) {
if (pBlockData->nColData >= taosArrayGetSize(pBlockData->aColData)) {
if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx);
pColData = (SColData *)taosArrayGet(pBlockData->aColData, pBlockData->nColData);
if (taosArrayInsert(pBlockData->aIdx, iColData, &idx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pBlockData->nColData++;
*ppColData = pColData;
return code;
......@@ -1087,7 +1052,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
tRowIterInit(&rIter, pRow, pTSchema);
pColVal = tRowIterNext(&rIter);
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
while (pColVal && pColVal->cid < pColData->cid) {
......@@ -1115,19 +1080,19 @@ int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFr
int32_t code = 0;
int32_t iColData = 0;
for (int32_t iColDataFrom = 0; iColDataFrom < taosArrayGetSize(pBlockDataFrom->aIdx); iColDataFrom++) {
for (int32_t iColDataFrom = 0; iColDataFrom < pBlockDataFrom->nColData; iColDataFrom++) {
SColData *pColDataFrom = tBlockDataGetColDataByIdx(pBlockDataFrom, iColDataFrom);
while (true) {
SColData *pColData;
if (iColData < taosArrayGetSize(pBlockData->aIdx)) {
if (iColData < pBlockData->nColData) {
pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
} else {
pColData = NULL;
}
if (pColData == NULL || pColData->cid > pColDataFrom->cid) {
code = tBlockDataAddColData(pBlockData, iColData, &pColData);
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, pColDataFrom->cid, pColDataFrom->type, pColDataFrom->smaOn);
......@@ -1226,55 +1191,15 @@ _exit:
return code;
}
int32_t tBlockDataCopy(SBlockData *pSrc, SBlockData *pDest) {
int32_t code = 0;
tBlockDataClear(pDest);
ASSERT(pDest->suid == pSrc->suid);
ASSERT(pDest->uid == pSrc->uid);
ASSERT(taosArrayGetSize(pSrc->aIdx) == taosArrayGetSize(pDest->aIdx));
pDest->nRow = pSrc->nRow;
if (pSrc->uid == 0) {
code = tRealloc((uint8_t **)&pDest->aUid, sizeof(int64_t) * pDest->nRow);
if (code) goto _exit;
memcpy(pDest->aUid, pSrc->aUid, sizeof(int64_t) * pDest->nRow);
}
code = tRealloc((uint8_t **)&pDest->aVersion, sizeof(int64_t) * pDest->nRow);
if (code) goto _exit;
memcpy(pDest->aVersion, pSrc->aVersion, sizeof(int64_t) * pDest->nRow);
code = tRealloc((uint8_t **)&pDest->aTSKEY, sizeof(TSKEY) * pDest->nRow);
if (code) goto _exit;
memcpy(pDest->aTSKEY, pSrc->aTSKEY, sizeof(TSKEY) * pDest->nRow);
for (int32_t iColData = 0; iColData < taosArrayGetSize(pSrc->aIdx); iColData++) {
SColData *pColSrc = tBlockDataGetColDataByIdx(pSrc, iColData);
SColData *pColDest = tBlockDataGetColDataByIdx(pDest, iColData);
ASSERT(pColSrc->cid == pColDest->cid);
ASSERT(pColSrc->type == pColDest->type);
code = tColDataCopy(pColSrc, pColDest);
if (code) goto _exit;
}
_exit:
return code;
}
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx) {
ASSERT(idx >= 0 && idx < taosArrayGetSize(pBlockData->aIdx));
return (SColData *)taosArrayGet(pBlockData->aColData, *(int32_t *)taosArrayGet(pBlockData->aIdx, idx));
ASSERT(idx >= 0 && idx < pBlockData->nColData);
return (SColData *)taosArrayGet(pBlockData->aColData, idx);
}
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {
ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID);
int32_t lidx = 0;
int32_t ridx = taosArrayGetSize(pBlockData->aIdx) - 1;
int32_t ridx = pBlockData->nColData - 1;
while (lidx <= ridx) {
int32_t midx = (lidx + ridx) / 2;
......@@ -1308,7 +1233,7 @@ int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut,
// encode =================
// columns AND SBlockCol
aBufN[0] = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
ASSERT(pColData->flag);
......@@ -1431,7 +1356,7 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
ASSERT(nt <= hdr.szBlkCol);
SColData *pColData;
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
code = tBlockDataAddColData(pBlockData, &pColData);
if (code) goto _exit;
tColDataInit(pColData, blockCol.cid, blockCol.type, blockCol.smaOn);
......
......@@ -79,7 +79,7 @@ void syncNodeRemove(int64_t rid) { taosRemoveRef(gNodeRefId, rid); }
SSyncNode *syncNodeAcquire(int64_t rid) {
SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
if (pNode == NULL) {
sTrace("failed to acquire node from refId:%" PRId64, rid);
sError("failed to acquire node from refId:%" PRId64, rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
......
......@@ -221,8 +221,12 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("sync begin snapshot error");
return -1;
}
int32_t code = 0;
if (syncNodeIsMnode(pSyncNode)) {
......@@ -330,7 +334,10 @@ _DEL_WAL:
int32_t syncEndSnapshot(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if (pSyncNode == NULL) {
sError("sync end snapshot error");
return -1;
}
int32_t code = 0;
if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
......@@ -352,7 +359,10 @@ int32_t syncEndSnapshot(int64_t rid) {
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if (pSyncNode == NULL) {
sError("sync step down error");
return -1;
}
syncNodeStepDown(pSyncNode, newTerm);
syncNodeRelease(pSyncNode);
......@@ -361,7 +371,10 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
bool syncIsReadyForRead(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if (pSyncNode == NULL) {
sError("sync ready for read error");
return false;
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
syncNodeRelease(pSyncNode);
......@@ -651,7 +664,10 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return -1;
if (pSyncNode == NULL) {
sError("sync propose error");
return -1;
}
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
syncNodeRelease(pSyncNode);
......@@ -2528,9 +2544,19 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
// append entry
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
// del resp mgr, call FpCommitCb
ASSERT(0);
return -1;
if (ths->replicaNum == 1) {
if (h) {
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
} else {
syncEntryDestory(pEntry);
}
return -1;
} else {
// del resp mgr, call FpCommitCb
ASSERT(0);
return -1;
}
}
// if mulit replica, start replicate right now
......
......@@ -236,6 +236,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册