提交 064b2082 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_snapshot

Subproject commit 1c8924dc668e6aa848214c2fc54e3ace3f5bf8df Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12
...@@ -72,7 +72,7 @@ int32_t* taosGetErrno(); ...@@ -72,7 +72,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030) #define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
...@@ -1831,6 +1831,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1831,6 +1831,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
pSubmitBlk->suid = suid; pSubmitBlk->suid = suid;
pSubmitBlk->uid = pDataBlock->info.groupId; pSubmitBlk->uid = pDataBlock->info.groupId;
pSubmitBlk->numOfRows = rows; pSubmitBlk->numOfRows = rows;
pSubmitBlk->sversion = pTSchema->version;
msgLen += sizeof(SSubmitBlk); msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0; int32_t dataLen = 0;
......
...@@ -140,6 +140,7 @@ int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData ...@@ -140,6 +140,7 @@ int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData);
int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest);
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
// SDelIdx // SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tPutDelIdx(uint8_t *p, void *ph);
...@@ -419,7 +420,7 @@ struct SBlockData { ...@@ -419,7 +420,7 @@ struct SBlockData {
int32_t nRow; int32_t nRow;
int64_t *aVersion; int64_t *aVersion;
TSKEY *aTSKEY; TSKEY *aTSKEY;
SArray *aColDataP; // SArray<SColData *> SArray *aIdx; // SArray<int32_t>
SArray *aColData; // SArray<SColData> SArray *aColData; // SArray<SColData>
}; };
......
...@@ -562,7 +562,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) ...@@ -562,7 +562,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
goto _err; goto _err;
} }
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) { if (pReq && tdProcessSubmitReq(sinkTsdb, 1, pReq) < 0) {
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
goto _err; goto _err;
} }
......
...@@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) { ...@@ -87,6 +87,7 @@ int32_t tqMetaOpen(STQ* pTq) {
.reader = handle.execHandle.pExecReader[i], .reader = handle.execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb, .pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
}; };
handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader); handle.execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(handle.execHandle.execCol.qmsg, &reader);
ASSERT(handle.execHandle.execCol.task[i]); ASSERT(handle.execHandle.execCol.task[i]);
......
...@@ -154,6 +154,7 @@ static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArr ...@@ -154,6 +154,7 @@ static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArr
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow); STSRow** pTSRow);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr);
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
...@@ -373,7 +374,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -373,7 +374,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
initReaderStatus(&pReader->status); initReaderStatus(&pReader->status);
pReader->pTsdb = pVnode->pTsdb; pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr);
pReader->suid = pCond->suid; pReader->suid = pCond->suid;
pReader->order = pCond->order; pReader->order = pCond->order;
pReader->capacity = 4096; pReader->capacity = 4096;
...@@ -755,11 +756,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -755,11 +756,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
i += 1; i += 1;
} }
while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aColDataP)) { while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) {
rowIndex = 0; rowIndex = 0;
pColData = taosArrayGet(pResBlock->pDataBlock, i); pColData = taosArrayGet(pResBlock->pDataBlock, i);
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, colIndex); SColData* pData = tBlockDataGetColDataByIdx(pBlockData, colIndex);
if (pData->cid == pColData->info.colId) { if (pData->cid == pColData->info.colId) {
for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) { for (int32_t j = pDumpInfo->rowIndex; j < endIndex && j >= 0; j += step) {
...@@ -1722,13 +1723,16 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1722,13 +1723,16 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
int64_t elapsedTime = taosGetTimestampUs() - st; blockDataUpdateTsWindow(pBlock, 0);
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pReader->idStr);
pBlock->info.uid = pBlockScanInfo->uid; pBlock->info.uid = pBlockScanInfo->uid;
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64
" us, numOfRows:%d, numOfCols:%d, brange: %" PRId64 " - %" PRId64 " %s",
pReader, elapsedTime, pBlock->info.rows, (int32_t)blockDataGetNumOfCols(pBlock), pBlock->info.window.skey,
pBlock->info.window.ekey, pReader->idStr);
return code; return code;
} }
...@@ -2375,6 +2379,43 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2375,6 +2379,43 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr) {
if (VND_IS_RSMA(pVnode)) {
int level = 0;
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + level;
if (pRetention->keep <= 0) {
if (level > 0) {
--level;
}
break;
}
if ((now - pRetention->keep) <= winSKey) {
break;
}
++level;
}
int32_t vgId = TD_VID(pVnode);
const char* str = (idStr != NULL)? idStr:"";
if (level == TSDB_RETENTION_L0) {
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str);
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str);
return VND_RSMA1(pVnode);
} else {
tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str);
return VND_RSMA2(pVnode);
}
}
return VND_TSDB(pVnode);
}
// // todo not unref yet, since it is not support multi-group interpolation query // // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) { // static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
// // filter the queried time stamp in the first place // // filter the queried time stamp in the first place
...@@ -2533,12 +2574,12 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -2533,12 +2574,12 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
*state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1; int32_t nextIndex = -1;
SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
if (pNeighborBlock == NULL) { // do nothing if (pNeighborBlock == NULL) { // do nothing
*state = CHECK_FILEBLOCK_QUIT;
return 0; return 0;
} }
...@@ -3280,8 +3321,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_ ...@@ -3280,8 +3321,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
} }
} }
ASSERT(0); tsdbDebug("%p reset reader, suid:%"PRIu64", numOfTables:%d, query range:%"PRId64" - %"PRId64" in query %s", pReader, pReader->suid,
tsdbDebug("%p reset tsdbreader in query %s", pReader, numOfTables, pReader->idStr); numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
return code; return code;
} }
......
...@@ -902,7 +902,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S ...@@ -902,7 +902,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _err; if (code) goto _err;
tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn); tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn);
...@@ -1762,8 +1762,8 @@ static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBloc ...@@ -1762,8 +1762,8 @@ static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBloc
// prepare // prepare
pSubBlock->nSma = 0; pSubBlock->nSma = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData); pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue; if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue;
...@@ -1775,8 +1775,8 @@ static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBloc ...@@ -1775,8 +1775,8 @@ static int32_t tsdbWriteBlockSma(TdFilePtr pFD, SBlockData *pBlockData, SSubBloc
code = tRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM)); code = tRealloc(ppBuf, sizeof(SColumnDataAgg) * pSubBlock->nSma + sizeof(TSCKSUM));
if (code) goto _err; if (code) goto _err;
n = 0; n = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData); pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue; if (IS_VAR_DATA_TYPE(pColData->type) || (!pColData->smaOn)) continue;
...@@ -1834,14 +1834,14 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ ...@@ -1834,14 +1834,14 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if (code) goto _err; if (code) goto _err;
// COLUMNS // COLUMNS
aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aColDataP), sizeof(SBlockCol)); aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol));
if (aBlockCol == NULL) { if (aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
int32_t offset = 0; int32_t offset = 0;
for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aColDataP); iCol++) { for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aIdx); iCol++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iCol); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
ASSERT(pColData->flag); ASSERT(pColData->flag);
......
...@@ -544,8 +544,8 @@ SColVal *tRowIterNext(SRowIter *pIter) { ...@@ -544,8 +544,8 @@ SColVal *tRowIterNext(SRowIter *pIter) {
return &pIter->colVal; return &pIter->colVal;
} }
} else { } else {
if (pIter->i < taosArrayGetSize(pIter->pRow->pBlockData->aColDataP)) { if (pIter->i < taosArrayGetSize(pIter->pRow->pBlockData->aIdx)) {
SColData *pColData = (SColData *)taosArrayGetP(pIter->pRow->pBlockData->aColDataP, pIter->i); SColData *pColData = tBlockDataGetColDataByIdx(pIter->pRow->pBlockData, pIter->i);
tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal); tColDataGetValue(pColData, pIter->pRow->iRow, &pIter->colVal);
pIter->i++; pIter->i++;
...@@ -892,9 +892,9 @@ _exit: ...@@ -892,9 +892,9 @@ _exit:
return code; return code;
} }
static FORCE_INLINE int32_t tColDataPCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) {
SColData *pColData1 = *(SColData **)p1; SColData *pColData1 = (SColData *)p1;
SColData *pColData2 = *(SColData **)p2; SColData *pColData2 = (SColData *)p2;
if (pColData1->cid < pColData2->cid) { if (pColData1->cid < pColData2->cid) {
return -1; return -1;
...@@ -912,14 +912,14 @@ int32_t tBlockDataInit(SBlockData *pBlockData) { ...@@ -912,14 +912,14 @@ int32_t tBlockDataInit(SBlockData *pBlockData) {
pBlockData->nRow = 0; pBlockData->nRow = 0;
pBlockData->aVersion = NULL; pBlockData->aVersion = NULL;
pBlockData->aTSKEY = NULL; pBlockData->aTSKEY = NULL;
pBlockData->aColDataP = taosArrayInit(0, sizeof(SColData *)); pBlockData->aIdx = taosArrayInit(0, sizeof(int32_t));
if (pBlockData->aColDataP == NULL) { if (pBlockData->aIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pBlockData->aColData = taosArrayInit(0, sizeof(SColData)); pBlockData->aColData = taosArrayInit(0, sizeof(SColData));
if (pBlockData->aColData == NULL) { if (pBlockData->aColData == NULL) {
taosArrayDestroy(pBlockData->aColDataP); taosArrayDestroy(pBlockData->aIdx);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
...@@ -930,13 +930,13 @@ _exit: ...@@ -930,13 +930,13 @@ _exit:
void tBlockDataReset(SBlockData *pBlockData) { void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->nRow = 0; pBlockData->nRow = 0;
taosArrayClear(pBlockData->aColDataP); taosArrayClear(pBlockData->aIdx);
} }
void tBlockDataClear(SBlockData *pBlockData) { void tBlockDataClear(SBlockData *pBlockData) {
tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aVersion);
tFree((uint8_t *)pBlockData->aTSKEY); tFree((uint8_t *)pBlockData->aTSKEY);
taosArrayDestroy(pBlockData->aColDataP); taosArrayDestroy(pBlockData->aIdx);
taosArrayDestroyEx(pBlockData->aColData, tColDataClear); taosArrayDestroyEx(pBlockData->aColData, tColDataClear);
} }
...@@ -961,8 +961,8 @@ _exit: ...@@ -961,8 +961,8 @@ _exit:
void tBlockDataClearData(SBlockData *pBlockData) { void tBlockDataClearData(SBlockData *pBlockData) {
pBlockData->nRow = 0; pBlockData->nRow = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aColDataP); iColData++) { for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData); SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
tColDataReset(pColData); tColDataReset(pColData);
} }
} }
...@@ -970,7 +970,7 @@ void tBlockDataClearData(SBlockData *pBlockData) { ...@@ -970,7 +970,7 @@ void tBlockDataClearData(SBlockData *pBlockData) {
int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) { int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData) {
int32_t code = 0; int32_t code = 0;
SColData *pColData = NULL; SColData *pColData = NULL;
int32_t idx = taosArrayGetSize(pBlockData->aColDataP); int32_t idx = taosArrayGetSize(pBlockData->aIdx);
if (idx >= taosArrayGetSize(pBlockData->aColData)) { if (idx >= taosArrayGetSize(pBlockData->aColData)) {
if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) { if (taosArrayPush(pBlockData->aColData, &((SColData){0})) == NULL) {
...@@ -980,7 +980,7 @@ int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData ...@@ -980,7 +980,7 @@ int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData
} }
pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx); pColData = (SColData *)taosArrayGet(pBlockData->aColData, idx);
if (taosArrayInsert(pBlockData->aColDataP, iColData, &pColData) == NULL) { if (taosArrayInsert(pBlockData->aIdx, iColData, &idx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
...@@ -1006,7 +1006,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1006,7 +1006,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
// OTHER // OTHER
int32_t iColData = 0; int32_t iColData = 0;
int32_t nColData = taosArrayGetSize(pBlockData->aColDataP); int32_t nColData = taosArrayGetSize(pBlockData->aIdx);
SRowIter iter = {0}; SRowIter iter = {0};
SRowIter *pIter = &iter; SRowIter *pIter = &iter;
SColData *pColData; SColData *pColData;
...@@ -1015,7 +1015,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1015,7 +1015,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
ASSERT(nColData > 0); ASSERT(nColData > 0);
tRowIterInit(pIter, pRow, pTSchema); tRowIterInit(pIter, pRow, pTSchema);
pColData = (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData); pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
pColVal = tRowIterNext(pIter); pColVal = tRowIterNext(pIter);
while (pColData) { while (pColData) {
...@@ -1025,12 +1025,12 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1025,12 +1025,12 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
if (code) goto _err; if (code) goto _err;
pColVal = tRowIterNext(pIter); pColVal = tRowIterNext(pIter);
pColData = ((++iColData) < nColData) ? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData) : NULL; pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL;
} else if (pColData->cid < pColVal->cid) { } else if (pColData->cid < pColVal->cid) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err; if (code) goto _err;
pColData = ((++iColData) < nColData) ? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData) : NULL; pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL;
} else { } else {
pColVal = tRowIterNext(pIter); pColVal = tRowIterNext(pIter);
} }
...@@ -1038,7 +1038,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS ...@@ -1038,7 +1038,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err; if (code) goto _err;
pColData = ((++iColData) < nColData) ? (SColData *)taosArrayGetP(pBlockData->aColDataP, iColData) : NULL; pColData = ((++iColData) < nColData) ? tBlockDataGetColDataByIdx(pBlockData, iColData) : NULL;
} }
} }
...@@ -1054,33 +1054,33 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock ...@@ -1054,33 +1054,33 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock
// set target // set target
int32_t iColData1 = 0; int32_t iColData1 = 0;
int32_t nColData1 = taosArrayGetSize(pBlockData1->aColDataP); int32_t nColData1 = taosArrayGetSize(pBlockData1->aIdx);
int32_t iColData2 = 0; int32_t iColData2 = 0;
int32_t nColData2 = taosArrayGetSize(pBlockData2->aColDataP); int32_t nColData2 = taosArrayGetSize(pBlockData2->aIdx);
SColData *pColData1; SColData *pColData1;
SColData *pColData2; SColData *pColData2;
SColData *pColData; SColData *pColData;
tBlockDataReset(pBlockData); tBlockDataReset(pBlockData);
while (iColData1 < nColData1 && iColData2 < nColData2) { while (iColData1 < nColData1 && iColData2 < nColData2) {
pColData1 = (SColData *)taosArrayGetP(pBlockData1->aColDataP, iColData1); pColData1 = tBlockDataGetColDataByIdx(pBlockData1, iColData1);
pColData2 = (SColData *)taosArrayGetP(pBlockData2->aColDataP, iColData2); pColData2 = tBlockDataGetColDataByIdx(pBlockData2, iColData2);
if (pColData1->cid == pColData2->cid) { if (pColData1->cid == pColData2->cid) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit; if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn); tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
iColData1++; iColData1++;
iColData2++; iColData2++;
} else if (pColData1->cid < pColData2->cid) { } else if (pColData1->cid < pColData2->cid) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit; if (code) goto _exit;
tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn); tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn);
iColData1++; iColData1++;
} else { } else {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit; if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn); tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
...@@ -1089,7 +1089,7 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock ...@@ -1089,7 +1089,7 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock
} }
while (iColData1 < nColData1) { while (iColData1 < nColData1) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit; if (code) goto _exit;
tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn); tColDataInit(pColData, pColData1->cid, pColData1->type, pColData1->smaOn);
...@@ -1097,7 +1097,7 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock ...@@ -1097,7 +1097,7 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlock
} }
while (iColData2 < nColData2) { while (iColData2 < nColData2) {
code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData);
if (code) goto _exit; if (code) goto _exit;
tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn); tColDataInit(pColData, pColData2->cid, pColData2->type, pColData2->smaOn);
...@@ -1168,8 +1168,8 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) { ...@@ -1168,8 +1168,8 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) {
memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow); memcpy(pBlockDataDest->aTSKEY, pBlockDataSrc->aTSKEY, sizeof(TSKEY) * pBlockDataSrc->nRow);
// other // other
for (size_t iColData = 0; iColData < taosArrayGetSize(pBlockDataSrc->aColDataP); iColData++) { for (size_t iColData = 0; iColData < taosArrayGetSize(pBlockDataSrc->aIdx); iColData++) {
pColDataSrc = (SColData *)taosArrayGetP(pBlockDataSrc->aColDataP, iColData); pColDataSrc = tBlockDataGetColDataByIdx(pBlockDataSrc, iColData);
code = tBlockDataAddColData(pBlockDataDest, iColData, &pColDataDest); code = tBlockDataAddColData(pBlockDataDest, iColData, &pColDataDest);
if (code) goto _exit; if (code) goto _exit;
...@@ -1181,17 +1181,36 @@ _exit: ...@@ -1181,17 +1181,36 @@ _exit:
return code; return code;
} }
SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx) {
ASSERT(idx >= 0 && idx < taosArrayGetSize(pBlockData->aIdx));
return (SColData *)taosArrayGet(pBlockData->aColData, *(int32_t *)taosArrayGet(pBlockData->aIdx, idx));
}
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) { void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {
ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID);
int32_t lidx = 0;
int32_t ridx = taosArrayGetSize(pBlockData->aIdx) - 1;
int32_t midx;
SColData *pColData = &(SColData){.cid = cid}; while (lidx <= ridx) {
SColData *pColData;
int32_t c;
void *p = taosArraySearch(pBlockData->aColDataP, &pColData, tColDataPCmprFn, TD_EQ); midx = (lidx + midx) / 2;
if (p == NULL) {
*ppColData = NULL; pColData = tBlockDataGetColDataByIdx(pBlockData, midx);
c = tColDataCmprFn(pColData, &(SColData){.cid = cid});
if (c == 0) {
*ppColData = pColData;
return;
} else if (c < 0) {
lidx = midx + 1;
} else { } else {
*ppColData = *(SColData **)p; ridx = midx - 1;
}
} }
*ppColData = NULL;
} }
// ALGORITHM ============================== // ALGORITHM ==============================
......
...@@ -305,9 +305,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, ...@@ -305,9 +305,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond; SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
if (pScanNode->tableType == TSDB_SUPER_TABLE) { if (pScanNode->tableType == TSDB_SUPER_TABLE) {
if (pTagIndexCond) { if (pTagIndexCond) {
SIndexMetaArg metaArg = {
.metaEx = metaHandle, .idx = vnodeGetIdx(metaHandle), .ivtIdx = vnodeGetIvtIdx(metaHandle), .suid = tableUid};
SArray* res = taosArrayInit(8, sizeof(uint64_t)); SArray* res = taosArrayInit(8, sizeof(uint64_t));
// code = doFilterTag(pTagIndexCond, &metaArg, res); // code = doFilterTag(pTagIndexCond, &metaArg, res);
code = TSDB_CODE_INDEX_REBUILDING; code = TSDB_CODE_INDEX_REBUILDING;
......
...@@ -163,8 +163,8 @@ ...@@ -163,8 +163,8 @@
# --- sma # --- sma
./test.sh -f tsim/sma/drop_sma.sim ./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim #./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim #./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind # --- valgrind
./test.sh -f tsim/valgrind/checkError.sim -v ./test.sh -f tsim/valgrind/checkError.sim -v
......
...@@ -97,23 +97,23 @@ if $data01 != 8 then ...@@ -97,23 +97,23 @@ if $data01 != 8 then
goto loop1 goto loop1
endi endi
if $data02 != 4 then if $data02 != 6 then
print =====data02=$data02 print =====data02=$data02
goto loop1 goto loop1
endi endi
if $data03 != 4 then if $data03 != 52 then
print ======$data03 print ======data03=$data03
goto loop1 goto loop1
endi endi
if $data04 != 52 then if $data04 != 52 then
print ======$data04 print ======data04=$data04
goto loop1 goto loop1
endi endi
if $data05 != 13 then if $data05 != 13 then
print ======$data05 print ======data05=$data05
goto loop1 goto loop1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册