提交 95dd30be 编写于 作者: H Haojun Liao

fix(query): fix coverity issues.

上级 9490075f
...@@ -438,21 +438,18 @@ int taos_init() { ...@@ -438,21 +438,18 @@ int taos_init() {
} }
int taos_options_imp(TSDB_OPTION option, const char *str) { int taos_options_imp(TSDB_OPTION option, const char *str) {
if (option != TSDB_OPTION_CONFIGDIR) { if (option == TSDB_OPTION_CONFIGDIR) {
taos_init(); // initialize global config
} else {
tstrncpy(configDir, str, PATH_MAX); tstrncpy(configDir, str, PATH_MAX);
tscInfo("set cfg:%s to %s", configDir, str); tscInfo("set cfg:%s to %s", configDir, str);
return 0; return 0;
} else {
taos_init(); // initialize global config
} }
SConfig *pCfg = taosGetCfg(); SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL; SConfigItem *pItem = NULL;
switch (option) { switch (option) {
case TSDB_OPTION_CONFIGDIR:
pItem = cfgGetItem(pCfg, "configDir");
break;
case TSDB_OPTION_SHELL_ACTIVITY_TIMER: case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
pItem = cfgGetItem(pCfg, "shellActivityTimer"); pItem = cfgGetItem(pCfg, "shellActivityTimer");
break; break;
......
...@@ -1130,7 +1130,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM ...@@ -1130,7 +1130,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
break; break;
} }
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { // TODO weired responding code?
if (TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
} }
} }
......
...@@ -645,11 +645,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN ...@@ -645,11 +645,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug( tsdbDebug("load block of %" PRIzu
"load block of %"PRIzu" tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed " " tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s", "time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el, numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr); pReader->idStr);
pReader->cost.numOfBlocks += total; pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el; pReader->cost.headFileLoadTime += el;
...@@ -1579,7 +1579,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1579,7 +1579,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) { if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) { if (minKey == key) {
init = true; init = true;
tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
...@@ -1589,7 +1592,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1589,7 +1592,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
} }
...@@ -1600,16 +1606,29 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1600,16 +1606,29 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema); int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
} }
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
} }
} else { } else {
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema); int32_t code = tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
if (minKey == tsLast) { if (minKey == tsLast) {
...@@ -1618,7 +1637,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1618,7 +1637,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
} else { } else {
init = true; init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
} }
...@@ -1628,7 +1650,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1628,7 +1650,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow); tRowMerge(&merge, &fRow);
} else { } else {
init = true; init = true;
tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
...@@ -1662,13 +1687,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1662,13 +1687,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) { if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tRowMerge(&merge, &fRow1); tRowMerge(&merge, &fRow1);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
int32_t code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1679,7 +1707,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1679,7 +1707,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
tRowMergerClear(&merge); tRowMergerClear(&merge);
} }
} else { // not merge block data } else { // not merge block data
tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
ASSERT(mergeBlockData); ASSERT(mergeBlockData);
...@@ -1688,7 +1720,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, ...@@ -1688,7 +1720,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
} }
int32_t code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1724,7 +1756,11 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1724,7 +1756,11 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
...@@ -1732,7 +1768,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1732,7 +1768,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
int32_t code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1756,9 +1792,9 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader ...@@ -1756,9 +1792,9 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) { SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0}; SRowMerger merge = {0};
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline; SArray* pDelList = pBlockScanInfo->delSkyline;
...@@ -1850,12 +1886,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1850,12 +1886,17 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, piRow, pSchema); code = tRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline,
&merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
if (minKey == k.ts) { if (minKey == k.ts) {
...@@ -1863,23 +1904,31 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1863,23 +1904,31 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, pRow); tRowMerge(&merge, pRow);
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema); code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} else { } else {
if (minKey == k.ts) { if (minKey == k.ts) {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema); code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
if (minKey == ik.ts) { if (minKey == ik.ts) {
...@@ -1888,12 +1937,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1888,12 +1937,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else { } else {
init = true; init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, piRow, pSchema); code = tRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
if (minKey == tsLast) { if (minKey == tsLast) {
...@@ -1924,7 +1977,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1924,7 +1977,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
int32_t code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2085,9 +2138,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc ...@@ -2085,9 +2138,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema); int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
int32_t code = tRowMergerGetRow(&merge, &pTSRow); code = tRowMergerGetRow(&merge, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2925,6 +2982,10 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe ...@@ -2925,6 +2982,10 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
} }
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid); STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, uid);
if (pTSchema == NULL) {
return terrno;
}
tRowMergerAdd(pMerger, pRow, pTSchema); tRowMergerAdd(pMerger, pRow, pTSchema);
} }
...@@ -3094,13 +3155,17 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, ...@@ -3094,13 +3155,17 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
} }
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid); STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
if(pTSchema1 == NULL) { if (pTSchema1 == NULL) {
return terrno; return terrno;
} }
tRowMergerAdd(&merge, pNextRow, pTSchema1); tRowMergerAdd(&merge, pNextRow, pTSchema1);
doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); code = doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tRowMergerGetRow(&merge, pTSRow); code = tRowMergerGetRow(&merge, pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -3121,19 +3186,44 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p ...@@ -3121,19 +3186,44 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema); int32_t code = tRowMergerInit(&merge, piRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tRowMerge(&merge, pRow); tRowMerge(&merge, pRow);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema); int32_t code = tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); if (code != TSDB_CODE_SUCCESS) {
return code;
}
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tRowMerge(&merge, piRow); tRowMerge(&merge, piRow);
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
int32_t code = tRowMergerGetRow(&merge, pTSRow); int32_t code = tRowMergerGetRow(&merge, pTSRow);
...@@ -3457,18 +3547,18 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3457,18 +3547,18 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* pPrevReader = pReader->innerReader[0]; STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pNextReader = pReader->innerReader[1]; STsdbReader* pNextReader = pReader->innerReader[1];
// we need only one row // we need only one row
pPrevReader->capacity = 1; pPrevReader->capacity = 1;
pPrevReader->status.pTableMap = pReader->status.pTableMap; pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->pSchema = pReader->pSchema; pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pMemSchema = pReader->pMemSchema; pPrevReader->pMemSchema = pReader->pMemSchema;
pPrevReader->pReadSnap = pReader->pReadSnap; pPrevReader->pReadSnap = pReader->pReadSnap;
pNextReader->capacity = 1; pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap; pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->pSchema = pReader->pSchema; pNextReader->pSchema = pReader->pSchema;
pNextReader->pMemSchema = pReader->pMemSchema; pNextReader->pMemSchema = pReader->pMemSchema;
pNextReader->pReadSnap = pReader->pReadSnap; pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader); code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -122,7 +122,8 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in ...@@ -122,7 +122,8 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) { if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
__compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc; __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, fn); int32_t size = POINTER_BYTES;
taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), size, fn);
} }
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
......
...@@ -276,6 +276,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -276,6 +276,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
&keyInfo.groupId); &keyInfo.groupId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(keyBuf);
return code; return code;
} }
} }
......
...@@ -97,11 +97,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); ...@@ -97,11 +97,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator);
void doSetOperatorCompleted(SOperatorInfo* pOperator) { void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
ASSERT(pOperator->pTaskInfo != NULL);
pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0; pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0;
if (pOperator->pTaskInfo != NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
}
} }
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
...@@ -1092,13 +1091,13 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM ...@@ -1092,13 +1091,13 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
} }
SFilterInfo* filter = pFilterInfo; SFilterInfo* filter = pFilterInfo;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// pError("start filter"); // pError("start filter");
// todo move to the initialization function // todo move to the initialization function
int32_t code = 0; int32_t code = 0;
bool needFree = false; bool needFree = false;
if (filter == NULL) { if (filter == NULL) {
needFree = true; needFree = true;
code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
...@@ -1848,6 +1847,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ...@@ -1848,6 +1847,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) { if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg); taosMemoryFreeClear(pMsg);
taosMemoryFree(pWrapper);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code; return pTaskInfo->code;
...@@ -3094,8 +3094,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3094,8 +3094,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
return pOperator; return pOperator;
_error: _error:
destroyAggOperatorInfo(pInfo); if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -3242,13 +3246,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3242,13 +3246,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
pOperator->exprSupp.pExprInfo = pExprInfo;
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
int32_t code = createWStartTsAsNotFillExpr(pInfo, pPhyFillNode); int32_t code = createWStartTsAsNotFillExpr(pInfo, pPhyFillNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
SInterval* pInterval = SInterval* pInterval =
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
...@@ -3258,16 +3265,20 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3258,16 +3265,20 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
int32_t type = convertFillType(pPhyFillNode->mode); int32_t type = convertFillType(pPhyFillNode->mode);
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr); code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId; pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr, code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
...@@ -3276,17 +3287,14 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3276,17 +3287,14 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
pInfo->pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
pInfo->pCondition = pPhyFillNode->node.pConditions; pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
...@@ -3298,8 +3306,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -3298,8 +3306,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) {
destroyFillOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
taosMemoryFreeClear(pInfo);
return NULL; return NULL;
} }
...@@ -3388,7 +3399,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { ...@@ -3388,7 +3399,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
pSchema->colId = pColNode->colId; pSchema->colId = pColNode->colId;
pSchema->type = pColNode->node.resType.type; pSchema->type = pColNode->node.resType.type;
pSchema->bytes = pColNode->node.resType.bytes; pSchema->bytes = pColNode->node.resType.bytes;
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name)); tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
} }
} }
......
...@@ -384,6 +384,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -384,6 +384,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
numOfRows = TWOMB / pResBlock->info.rowSize; numOfRows = TWOMB / pResBlock->info.rowSize;
} }
initBasicInfo(&pInfo->binfo, pResBlock);
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
...@@ -391,7 +392,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -391,7 +392,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
goto _error; goto _error;
} }
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
......
...@@ -1770,6 +1770,7 @@ FETCH_NEXT_BLOCK: ...@@ -1770,6 +1770,7 @@ FETCH_NEXT_BLOCK:
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pDelBlock, "stream scan delete result"); printDataBlock(pDelBlock, "stream scan delete result");
if (pInfo->pDeleteDataRes->info.rows > 0) { if (pInfo->pDeleteDataRes->info.rows > 0) {
blockDataDestroy(pDelBlock);
return pInfo->pDeleteDataRes; return pInfo->pDeleteDataRes;
} else { } else {
goto FETCH_NEXT_BLOCK; goto FETCH_NEXT_BLOCK;
...@@ -3001,7 +3002,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { ...@@ -3001,7 +3002,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
while (1) { while (1) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
strcpy(pInfo->req.user, pInfo->pUser); tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user));
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen); char* buf1 = taosMemoryCalloc(1, contLen);
...@@ -3332,6 +3333,11 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -3332,6 +3333,11 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
const char* idStr) { const char* idStr) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
if (pHandle == NULL) {
qError("invalid handle, in creating operator tree", idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to getTableList, code: %s", tstrerror(code)); qError("failed to getTableList, code: %s", tstrerror(code));
......
...@@ -398,15 +398,16 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) { ...@@ -398,15 +398,16 @@ int32_t finishSortGroup(SOperatorInfo* pOperator) {
SGroupSortOperatorInfo* pInfo = pOperator->info; SGroupSortOperatorInfo* pInfo = pOperator->info;
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
pInfo->sortExecInfo.loops += sortExecInfo.loops; pInfo->sortExecInfo.loops += sortExecInfo.loops;
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
if (pInfo->pCurrSortHandle != NULL) {
tsortDestroySortHandle(pInfo->pCurrSortHandle); tsortDestroySortHandle(pInfo->pCurrSortHandle);
}
pInfo->pCurrSortHandle = NULL; pInfo->pCurrSortHandle = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -717,10 +718,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -717,10 +718,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t rowSize = pResBlock->info.rowSize;
pInfo->binfo.pRes = createResDataBlock(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
goto _error; goto _error;
} }
...@@ -734,7 +734,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -734,7 +734,6 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->groupSort = pMergePhyNode->groupSort; pInfo->groupSort = pMergePhyNode->groupSort;
pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock; pInfo->pInputBlock = pInputBlock;
...@@ -761,7 +760,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size ...@@ -761,7 +760,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo); if (pInfo != NULL) {
destroyMultiwayMergeOperatorInfo(pInfo);
}
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }
...@@ -1480,7 +1480,7 @@ static void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, STimeWindowAgg ...@@ -1480,7 +1480,7 @@ static void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, STimeWindowAgg
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
TSKEY startTs = TMAX(tsStarts[i], pTwSup->minTs); TSKEY startTs = TMAX(tsStarts[i], pTwSup->minTs);
TSKEY endTs = TMIN(tsEnds[i], pTwSup->maxTs); TSKEY endTs = TMIN(tsEnds[i], pTwSup->maxTs);
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo = {0};
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTs, pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTs, pInterval, TSDB_ORDER_ASC);
do { do {
...@@ -1506,8 +1506,9 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* ...@@ -1506,8 +1506,9 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; uint64_t* pGpDatas = (uint64_t*)pGpCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
SResultRowInfo dumyInfo; SResultRowInfo dumyInfo = {0};
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC); STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCols[i], pInterval, TSDB_ORDER_ASC);
while (win.ekey <= endTsCols[i]) { while (win.ekey <= endTsCols[i]) {
uint64_t winGpId = pGpDatas[i]; uint64_t winGpId = pGpDatas[i];
...@@ -2748,11 +2749,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2748,11 +2749,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
goto _error; goto _error;
} }
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
if (pStateNode->window.pExprs != NULL) { if (pStateNode->window.pExprs != NULL) {
...@@ -2775,6 +2772,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -2775,6 +2772,10 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -3618,14 +3619,13 @@ void destroyStreamSessionAggOperatorInfo(void* param) { ...@@ -3618,14 +3619,13 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock) { SSDataBlock* pResultBlock) {
initBasicInfo(pBasicInfo, pResultBlock);
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initBasicInfo(pBasicInfo, pResultBlock);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].saveHandle.pBuf = NULL; pSup->pCtx[i].saveHandle.pBuf = NULL;
} }
...@@ -3665,9 +3665,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3665,9 +3665,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -3685,8 +3684,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3685,8 +3684,11 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
goto _error; goto _error;
} }
} }
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -3701,8 +3703,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3701,8 +3703,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if (pInfo->pDummyCtx == NULL) { if (pInfo->pDummyCtx == NULL) {
goto _error; goto _error;
} }
initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols);
initDummyFunction(pInfo->pDummyCtx, pSup->pCtx, numOfCols);
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pSessionNode->window.watermark, .waterMark = pSessionNode->window.watermark,
.calTrigger = pSessionNode->window.triggerType, .calTrigger = pSessionNode->window.triggerType,
...@@ -5004,14 +5006,14 @@ int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFu ...@@ -5004,14 +5006,14 @@ int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFu
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode; SStreamStateWinodwPhysiNode* pStateNode = (SStreamStateWinodwPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_SUCCESS;
SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo)); SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
...@@ -5025,7 +5027,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5025,7 +5027,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
if (pStateNode->window.pExprs != NULL) { if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -5040,6 +5042,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5040,6 +5042,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -5615,7 +5618,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5615,7 +5618,6 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
SInterval interval = {.interval = pIntervalPhyNode->interval, SInterval interval = {.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding, .sliding = pIntervalPhyNode->sliding,
...@@ -5644,6 +5646,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5644,6 +5646,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
goto _error; goto _error;
} }
SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc);
initBasicInfo(&pIntervalInfo->binfo, pResBlock); initBasicInfo(&pIntervalInfo->binfo, pResBlock);
initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win);
...@@ -5675,7 +5678,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -5675,7 +5678,10 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
return pOperator; return pOperator;
_error: _error:
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); if (pMergeIntervalInfo != NULL) {
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
}
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -5827,8 +5833,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5827,8 +5833,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
...@@ -5837,8 +5841,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -5837,8 +5841,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
initStreamFunciton(pSup->pCtx, pSup->numOfExprs); initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -220,7 +220,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { ...@@ -220,7 +220,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket; pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket;
pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t)); pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t));
if (pBucket->pPageIdList == NULL || pBucket == NULL) { if (pBucket->pPageIdList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -256,6 +256,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ ...@@ -256,6 +256,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, "", tsTempDir); int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, "", tsTempDir);
if (code != 0) { if (code != 0) {
taosMemoryFree(pHashObj);
terrno = code; terrno = code;
return NULL; return NULL;
} }
...@@ -351,7 +352,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data ...@@ -351,7 +352,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data
char* pStart = p->data; char* pStart = p->data;
while (pStart - ((char*)p) < p->num) { while (pStart - ((char*)p) < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart; SLHashNode* pNode = (SLHashNode*)pStart;
ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0); ASSERT(pNode->keyLen > 0);
char* k = GET_LHASH_NODE_KEY(pNode); char* k = GET_LHASH_NODE_KEY(pNode);
int32_t hashv = pHashObj->hashFn(k, pNode->keyLen); int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
......
...@@ -548,7 +548,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -548,7 +548,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
return code; return code;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册