提交 22ad3129 编写于 作者: D dapan1121

fix: query schema is old issue

上级 ad0505b2
...@@ -178,7 +178,7 @@ int32_t getJsonValueLen(const char* data); ...@@ -178,7 +178,7 @@ int32_t getJsonValueLen(const char* data);
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, int32_t numOfRow2); const SColumnInfoData* pSource, int32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
......
...@@ -147,9 +147,17 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) { ...@@ -147,9 +147,17 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData, static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
int32_t itemLen, int32_t numOfRows) { int32_t itemLen, int32_t numOfRows, bool trimValue) {
ASSERT(pColumnInfoData->info.bytes >= itemLen); if (pColumnInfoData->info.bytes < itemLen) {
uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen, pColumnInfoData->info.bytes, trimValue);
if (trimValue) {
itemLen = pColumnInfoData->info.bytes;
} else {
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
}
}
size_t start = 1; size_t start = 1;
// the first item // the first item
...@@ -178,10 +186,12 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren ...@@ -178,10 +186,12 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
pColumnInfoData->varmeta.length += numOfRows * itemLen; pColumnInfoData->varmeta.length += numOfRows * itemLen;
} }
return TSDB_CODE_SUCCESS;
} }
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows) { uint32_t numOfRows, bool trimValue) {
int32_t len = pColumnInfoData->info.bytes; int32_t len = pColumnInfoData->info.bytes;
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
len = varDataTLen(pData); len = varDataTLen(pData);
...@@ -193,8 +203,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, ...@@ -193,8 +203,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
} }
} }
doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows); return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
return TSDB_CODE_SUCCESS;
} }
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
......
...@@ -182,7 +182,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL ...@@ -182,7 +182,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
void tsdbReleaseDataBlock(STsdbReader *pReader); void tsdbReleaseDataBlock(STsdbReader *pReader);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
......
...@@ -949,14 +949,17 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int ...@@ -949,14 +949,17 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int
pDumpInfo->lastKey = maxKey + step; pDumpInfo->lastKey = maxKey + step;
} }
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
SBlockLoadSuppInfo* pSup) { SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->type)) { if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (!COL_VAL_IS_VALUE(pColVal)) { if (!COL_VAL_IS_VALUE(pColVal)) {
colDataSetNULL(pColInfoData, rowIndex); colDataSetNULL(pColInfoData, rowIndex);
} else { } else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
ASSERT(pColVal->value.nData <= pColInfoData->info.bytes); if (pColVal->value.nData > pColInfoData->info.bytes) {
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes);
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
}
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
} }
...@@ -966,6 +969,8 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_ ...@@ -966,6 +969,8 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} else { } else {
colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal)); colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
} }
return TSDB_CODE_SUCCESS;
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
...@@ -1167,6 +1172,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { ...@@ -1167,6 +1172,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SDataBlk* pBlock = getCurrentBlock(pBlockIter); SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfOutputCols = pSupInfo->numOfCols; int32_t numOfOutputCols = pSupInfo->numOfCols;
int32_t code = TSDB_CODE_SUCCESS;
SColVal cv = {0}; SColVal cv = {0};
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -1244,7 +1250,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { ...@@ -1244,7 +1250,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
} else { // varchar/nchar type } else { // varchar/nchar type
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) { for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
tColDataGetValue(pData, j, &cv); tColDataGetValue(pData, j, &cv);
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo); code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
if (code) {
return code;
}
} }
} }
} }
...@@ -1776,23 +1785,29 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1776,23 +1785,29 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
} }
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
SFileBlockDumpInfo* pDumpInfo) { SFileBlockDumpInfo* pDumpInfo, bool *copied) {
// opt version // opt version
// 1. it is not a border point // 1. it is not a border point
// 2. the direct next point is not an duplicated timestamp // 2. the direct next point is not an duplicated timestamp
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
bool asc = (pReader->order == TSDB_ORDER_ASC); bool asc = (pReader->order == TSDB_ORDER_ASC);
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) { if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1; int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
if (nextKey != key) { // merge is not needed if (nextKey != key) { // merge is not needed
doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) {
return code;
}
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
return true; *copied = true;
} }
} }
return false; return code;
} }
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
...@@ -1819,20 +1834,34 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc ...@@ -1819,20 +1834,34 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
} }
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) { STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) {
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange); bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
if (hasVal) { if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) { if (next1 != ts) {
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
return true; if (code) {
return code;
}
*copied = true;
return code;
} }
} else { } else {
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow); code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
return true; if (code) {
return code;
}
*copied = true;
return code;
} }
return false; return code;
} }
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) { static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
...@@ -2022,11 +2051,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2022,11 +2051,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return code;
} }
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
...@@ -2034,7 +2064,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2034,7 +2064,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
bool mergeBlockData) { bool mergeBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
bool copied = false;
int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
...@@ -2042,7 +2073,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2042,7 +2073,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
// only last block exists // only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) { code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastKey = tsLastBlock; pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -2060,10 +2096,15 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2060,10 +2096,15 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} else { // not merge block data } else { // not merge block data
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
...@@ -2083,10 +2124,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -2083,10 +2124,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2131,7 +2176,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -2131,7 +2176,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return code; return code;
} }
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
...@@ -2353,7 +2398,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -2353,7 +2398,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
...@@ -2508,7 +2553,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* ...@@ -2508,7 +2553,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader) { STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { bool copied = false;
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastKey = key; pBlockScanInfo->lastKey = key;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
...@@ -2528,11 +2579,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc ...@@ -2528,11 +2579,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return code; return code;
} }
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge); tsdbRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return code;
} }
} }
...@@ -2649,7 +2700,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2649,7 +2700,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) && if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) { pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) { if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader); code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
}
// record the last key value // record the last key value
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts; pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
...@@ -2696,8 +2750,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2696,8 +2750,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
break; break;
} }
buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
if (code) {
goto _end;
}
// currently loaded file data block is consumed // currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
...@@ -2922,6 +2979,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2922,6 +2979,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS;
if (taosHashGetSize(pStatus->pTableMap) == 0) { if (taosHashGetSize(pStatus->pTableMap) == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2952,7 +3010,11 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2952,7 +3010,11 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
break; break;
} }
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code) {
return code;
}
if (pResBlock->info.rows >= pReader->capacity) { if (pResBlock->info.rows >= pReader->capacity) {
break; break;
} }
...@@ -3032,7 +3094,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -3032,7 +3094,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
break; break;
} }
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (code) {
return code;
}
if (pResBlock->info.rows >= pReader->capacity) { if (pResBlock->info.rows >= pReader->capacity) {
break; break;
} }
...@@ -3784,6 +3850,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -3784,6 +3850,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) { int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
int32_t outputRowIndex = pBlock->info.rows; int32_t outputRowIndex = pBlock->info.rows;
int64_t uid = pScanInfo->uid; int64_t uid = pScanInfo->uid;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
...@@ -3806,7 +3873,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT ...@@ -3806,7 +3873,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
tRowGet(pTSRow, pSchema, j, &colVal); tRowGet(pTSRow, pSchema, j, &colVal);
doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo); code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
if (code) {
return code;
}
i += 1; i += 1;
j += 1; j += 1;
} else if (colId < pSchema->columns[j].colId) { } else if (colId < pSchema->columns[j].colId) {
...@@ -3836,6 +3906,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S ...@@ -3836,6 +3906,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t rowIndex) { int32_t rowIndex) {
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t outputRowIndex = pResBlock->info.rows; int32_t outputRowIndex = pResBlock->info.rows;
int32_t code = TSDB_CODE_SUCCESS;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) { if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
...@@ -3858,7 +3929,10 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S ...@@ -3858,7 +3929,10 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]); SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
if (pData->cid == pSupInfo->colId[i]) { if (pData->cid == pSupInfo->colId[i]) {
tColDataGetValue(pData, rowIndex, &cv); tColDataGetValue(pData, rowIndex, &cv);
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
if (code) {
return code;
}
j += 1; j += 1;
} else if (pData->cid > pCol->info.colId) { } else if (pData->cid > pCol->info.colId) {
// the specified column does not exist in file block, fill with null data // the specified column does not exist in file block, fill with null data
...@@ -3882,6 +3956,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S ...@@ -3882,6 +3956,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader) { STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
int32_t code = TSDB_CODE_SUCCESS;
do { do {
// SRow* pTSRow = NULL; // SRow* pTSRow = NULL;
...@@ -3893,13 +3968,20 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3893,13 +3968,20 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
} }
if (row.type == TSDBROW_ROW_FMT) { if (row.type == TSDBROW_ROW_FMT) {
doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo); code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
if (freeTSRow) { if (freeTSRow) {
taosMemoryFree(row.pTSRow); taosMemoryFree(row.pTSRow);
} }
if (code) {
return code;
}
} else { } else {
doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) {
break;
}
} }
// no data in buffer, return immediately // no data in buffer, return immediately
...@@ -3912,7 +3994,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -3912,7 +3994,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
} }
} while (1); } while (1);
return TSDB_CODE_SUCCESS; return code;
} }
// TODO refactor: with createDataBlockScanInfo // TODO refactor: with createDataBlockScanInfo
...@@ -4394,43 +4476,52 @@ _err: ...@@ -4394,43 +4476,52 @@ _err:
return code; return code;
} }
static bool doTsdbNextDataBlock(STsdbReader* pReader) { static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
// cleanup the data that belongs to the previous data block // cleanup the data that belongs to the previous data block
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
*hasNext = false;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0) { if (taosHashGetSize(pStatus->pTableMap) == 0) {
return false; return code;
} }
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
int32_t code = buildBlockFromFiles(pReader); code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return code;
} }
if (pBlock->info.rows > 0) { if (pBlock->info.rows > 0) {
return true; *hasNext = true;
} else { } else {
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
buildBlockFromBufferSequentially(pReader); code = buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0;
} }
} else { // no data in files, let's try the buffer } else { // no data in files, let's try the buffer
buildBlockFromBufferSequentially(pReader); code = buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0; *hasNext = pBlock->info.rows > 0;
} }
return code;
} }
bool tsdbNextDataBlock(STsdbReader* pReader) { int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
*hasNext = false;
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) { if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
return false; return code;
} }
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = tsdbAcquireReader(pReader); code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
if (pReader->suspended) { if (pReader->suspended) {
...@@ -4438,16 +4529,21 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -4438,16 +4529,21 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
} }
if (pReader->innerReader[0] != NULL && pReader->step == 0) { if (pReader->innerReader[0] != NULL && pReader->step == 0) {
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); code = doTsdbNextDataBlock(pReader->innerReader[0], hasNext);
if (code) {
tsdbReleaseReader(pReader);
return code;
}
pReader->step = EXTERNAL_ROWS_PREV; pReader->step = EXTERNAL_ROWS_PREV;
if (ret) { if (*hasNext) {
pStatus = &pReader->innerReader[0]->status; pStatus = &pReader->innerReader[0]->status;
if (pStatus->composedDataBlock) { if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader); qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
} }
return ret; return code;
} }
} }
...@@ -4464,14 +4560,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -4464,14 +4560,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
pReader->step = EXTERNAL_ROWS_MAIN; pReader->step = EXTERNAL_ROWS_MAIN;
} }
bool ret = doTsdbNextDataBlock(pReader); code = doTsdbNextDataBlock(pReader, hasNext);
if (ret) { if (code != TSDB_CODE_SUCCESS) {
tsdbReleaseReader(pReader);
return code;
}
if (*hasNext) {
if (pStatus->composedDataBlock) { if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader); qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
} }
return ret; return code;
} }
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) { if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
...@@ -4483,23 +4584,28 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -4483,23 +4584,28 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return code; return code;
} }
ret = doTsdbNextDataBlock(pReader->innerReader[1]); code = doTsdbNextDataBlock(pReader->innerReader[1], hasNext);
if (code != TSDB_CODE_SUCCESS) {
tsdbReleaseReader(pReader);
return code;
}
pReader->step = EXTERNAL_ROWS_NEXT; pReader->step = EXTERNAL_ROWS_NEXT;
if (ret) { if (*hasNext) {
pStatus = &pReader->innerReader[1]->status; pStatus = &pReader->innerReader[1]->status;
if (pStatus->composedDataBlock) { if (pStatus->composedDataBlock) {
qTrace("tsdb/read: %p, unlock read mutex", pReader); qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
} }
return ret; return code;
} }
} }
qTrace("tsdb/read: %p, unlock read mutex", pReader); qTrace("tsdb/read: %p, unlock read mutex", pReader);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return false; return code;
} }
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) { static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
...@@ -4644,20 +4750,27 @@ STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, co ...@@ -4644,20 +4750,27 @@ STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, co
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) { if (pBlockScanInfo == NULL) {
return NULL; return NULL;
} }
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData);
terrno = code;
return NULL;
}
code = copyBlockDataToSDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData); tBlockDataDestroy(&pStatus->fileBlockData);
terrno = code; terrno = code;
return NULL; return NULL;
} }
copyBlockDataToSDataBlock(pReader);
return pReader->pResBlock; return pReader->pResBlock;
} }
......
...@@ -584,10 +584,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int ...@@ -584,10 +584,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
if (isNullVal) { if (isNullVal) {
colDataSetNNULL(pColInfoData, 0, pBlock->info.rows); colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows); code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
taosMemoryFree(data); taosMemoryFree(data);
} }
if (code) {
if (freeReader) {
metaReaderClear(&mr);
}
return code;
}
} else { // todo opt for json tag } else { // todo opt for json tag
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataSetVal(pColInfoData, i, data, false); colDataSetVal(pColInfoData, i, data, false);
...@@ -634,10 +640,22 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ...@@ -634,10 +640,22 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false;
int32_t code = TSDB_CODE_SUCCESS;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { while (true) {
code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
if (code) {
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext) {
break;
}
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader); tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
...@@ -1015,7 +1033,15 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -1015,7 +1033,15 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
return NULL; return NULL;
} }
if (tsdbNextDataBlock(pReader)) { bool hasNext = false;
code = tsdbNextDataBlock(pReader, &hasNext);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
return NULL;
}
if (hasNext) {
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
...@@ -2104,12 +2130,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -2104,12 +2130,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("tmqsnap doRawScan called"); qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { bool hasNext = false;
if (pInfo->dataReader) {
code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
if (code) {
tsdbReleaseDataBlock(pInfo->dataReader);
longjmp(pTaskInfo->env, code);
}
}
if (pInfo->dataReader && hasNext) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(pInfo->dataReader); tsdbReleaseDataBlock(pInfo->dataReader);
longjmp(pTaskInfo->env, pTaskInfo->code); longjmp(pTaskInfo->env, pTaskInfo->code);
...@@ -2610,8 +2646,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2610,8 +2646,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pInfo->base.dataReader = source->dataReader; pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
bool hasNext = false;
qTrace("tsdb/read-table-data: %p, enter next reader", reader); qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) {
while (true) {
code = tsdbNextDataBlock(reader, &hasNext);
if (code != 0) {
tsdbReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code);
}
if (!hasNext) {
break;
}
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
tsdbReleaseDataBlock(reader); tsdbReleaseDataBlock(reader);
pInfo->base.dataReader = NULL; pInfo->base.dataReader = NULL;
......
...@@ -1627,7 +1627,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan ...@@ -1627,7 +1627,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(varTbName, name); STR_TO_VARSTR(varTbName, name);
colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows); colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows, true);
} }
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
......
...@@ -1755,7 +1755,11 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -1755,7 +1755,11 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char* p = colDataGetVarData(pInput->columnData, 0); char* p = colDataGetVarData(pInput->columnData, 0);
colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows); int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
if (code) {
return code;
}
pOutput->numOfRows += pInput->numOfRows; pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
...@@ -22,6 +24,7 @@ ...@@ -22,6 +24,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
......
import taos
import sys
import time
import socket
import os
import platform
if platform.system().lower() == 'windows':
import wexpect as taosExpect
else:
import pexpect as taosExpect
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
def taos_command (buildPath, key, value, expectString, sqlString=''):
if len(key) == 0:
tdLog.exit("taos test key is null!")
if platform.system().lower() == 'windows':
taosCmd = buildPath + '\\build\\bin\\taos.exe '
taosCmd = taosCmd.replace('\\','\\\\')
else:
taosCmd = buildPath + '/build/bin/taos '
cfgPath = buildPath + "/../sim/psim/cfg"
taosCmd = taosCmd + ' -c' + cfgPath + ' -' + key
if len(value) != 0:
taosCmd = taosCmd + ' ' + value
tdLog.info ("taos cmd: %s" % taosCmd)
child = taosExpect.spawn(taosCmd, timeout=20)
#output = child.readline()
#print (output.decode())
if len(expectString) != 0:
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
else:
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
if platform.system().lower() == 'windows':
retResult = child.before
else:
retResult = child.before.decode()
print(retResult)
#print(child.after.decode())
if i == 0:
print ('taos login success! Here can run sql, taos> ')
return "TAOS_OK"
else:
return "TAOS_FAIL"
class TDTestCase:
#updatecfgDict = {'clientCfg': {'serverPort': 7080, 'firstEp': 'trd02:7080', 'secondEp':'trd02:7080'},\
# 'serverPort': 7080, 'firstEp': 'trd02:7080'}
hostname = socket.gethostname()
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
try:
config = eval(tdDnodes.dnodes[0].remoteIP)
hostname = config["host"]
except Exception:
hostname = tdDnodes.dnodes[0].remoteIP
serverPort = '7080'
rpcDebugFlagVal = '143'
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
clientCfgDict["serverPort"] = serverPort
clientCfgDict["firstEp"] = hostname + ':' + serverPort
clientCfgDict["secondEp"] = hostname + ':' + serverPort
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
clientCfgDict["fqdn"] = hostname
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
updatecfgDict["clientCfg"] = clientCfgDict
updatecfgDict["serverPort"] = serverPort
updatecfgDict["firstEp"] = hostname + ':' + serverPort
updatecfgDict["secondEp"] = hostname + ':' + serverPort
updatecfgDict["fqdn"] = hostname
print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
tdSql.query("create user testpy pass 'testpy'")
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!")
else:
tdLog.info("taosd found in %s" % buildPath)
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
checkNetworkStatus = ['0: unavailable', '1: network ok', '2: service ok', '3: service degraded', '4: exiting']
netrole = ['client', 'server']
keyDict = {'h':'', 'P':'6030', 'p':'testpy', 'u':'testpy', 'a':'', 'A':'', 'c':'', 'C':'', 's':'', 'r':'', 'f':'', \
'k':'', 't':'', 'n':'', 'l':'1024', 'N':'100', 'V':'', 'd':'db', 'w':'30', '-help':'', '-usage':'', '?':''}
keyDict['h'] = self.hostname
keyDict['c'] = cfgPath
keyDict['P'] = self.serverPort
tdSql.query("drop database if exists db1")
tdSql.query("create database if not exists db1 vgroups 1")
tdSql.query("use db1")
tdSql.query("create table tba (ts timestamp, f1 binary(2))")
tdSql.query("insert into tba values (now, '22')")
tdSql.query("select * from tba")
tdSql.checkData(0, 1, '22')
keyDict['s'] = "\"alter table db1.tba modify column f1 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tba values (now, '55555')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select * from tba order by ts")
tdSql.checkData(0, 1, '22')
tdSql.checkData(1, 1, '55555')
tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))")
tdSql.query("create table tb1 using stb tags('bb')")
tdSql.query("insert into tb1 values (now, 2)")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
keyDict['s'] = "\"alter table db1.stb modify tag tg1 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"create table db1.tb2 using db1.stb tags('bbbbb')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 1)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册