未验证 提交 4039e99d 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17816 from taosdata/fix/TD-19686

fix: return err code when query fail
...@@ -30,10 +30,10 @@ ...@@ -30,10 +30,10 @@
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups // The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
struct STableListInfo { struct STableListInfo {
bool oneTableForEachGroup; bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList; SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid; uint64_t suid;
}; };
...@@ -1678,9 +1678,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) { ...@@ -1678,9 +1678,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) {
return taosArrayGetSize(pTableList->pTableList); return taosArrayGetSize(pTableList->pTableList);
} }
uint64_t tableListGetSuid(const STableListInfo* pTableList) { uint64_t tableListGetSuid(const STableListInfo* pTableList) { return pTableList->suid; }
return pTableList->suid;
}
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) { STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
if (taosArrayGetSize(pTableList->pTableList) == 0) { if (taosArrayGetSize(pTableList->pTableList) == 0) {
...@@ -1718,7 +1716,7 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t ...@@ -1718,7 +1716,7 @@ int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t
} }
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) { int32_t* size) {
int32_t total = tableListGetOutputGroups(pTableList); int32_t total = tableListGetOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -1728,7 +1726,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalG ...@@ -1728,7 +1726,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalG
// 1. only one group exists, and 2. one table exists for each group. // 1. only one group exists, and 2. one table exists for each group.
if (total == 1) { if (total == 1) {
*size = tableListGetSize(pTableList); *size = tableListGetSize(pTableList);
*pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0); *pKeyInfo = (*size == 0) ? NULL : taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (total == tableListGetSize(pTableList)) { } else if (total == tableListGetSize(pTableList)) {
*size = 1; *size = 1;
...@@ -1806,13 +1804,13 @@ void tableListClear(STableListInfo* pTableListInfo) { ...@@ -1806,13 +1804,13 @@ void tableListClear(STableListInfo* pTableListInfo) {
} }
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; STableKeyInfo* pInfo1 = (STableKeyInfo*)p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; STableKeyInfo* pInfo2 = (STableKeyInfo*)p2;
if (pInfo1->groupId == pInfo2->groupId) { if (pInfo1->groupId == pInfo2->groupId) {
return 0; return 0;
} else { } else {
return pInfo1->groupId < pInfo2->groupId? -1:1; return pInfo1->groupId < pInfo2->groupId ? -1 : 1;
} }
} }
...@@ -1825,12 +1823,12 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { ...@@ -1825,12 +1823,12 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
SArray* pList = taosArrayInit(4, sizeof(int32_t)); SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId; uint64_t gid = pInfo->groupId;
int32_t start = 0; int32_t start = 0;
taosArrayPush(pList, &start); taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) { for (int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i); pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) { if (pInfo->groupId != gid) {
taosArrayPush(pList, &i); taosArrayPush(pList, &i);
...@@ -1845,16 +1843,17 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { ...@@ -1845,16 +1843,17 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group,
bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
ASSERT(pTableListInfo->map != NULL); ASSERT(pTableListInfo->map != NULL);
bool groupByTbname = groupbyTbname(group); bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) { if (group == NULL || groupByTbname) {
for (int32_t i = 0; i < numOfTables; i++) { for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = groupByTbname? info->uid:0; info->groupId = groupByTbname ? info->uid : 0;
} }
pTableListInfo->oneTableForEachGroup = groupByTbname; pTableListInfo->oneTableForEachGroup = groupByTbname;
...@@ -1878,7 +1877,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* ...@@ -1878,7 +1877,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
// add all table entry in the hash map // add all table entry in the hash map
size_t size = taosArrayGetSize(pTableListInfo->pTableList); size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t)); taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
} }
...@@ -1889,7 +1888,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* ...@@ -1889,7 +1888,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
struct SExecTaskInfo* pTaskInfo) { struct SExecTaskInfo* pTaskInfo) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
const char* idStr = GET_TASKID(pTaskInfo); const char* idStr = GET_TASKID(pTaskInfo);
if (pHandle == NULL) { if (pHandle == NULL) {
...@@ -1919,7 +1918,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags ...@@ -1919,7 +1918,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return code; return code;
} }
pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1)/1000.0; pTaskInfo->cost.groupIdMapTime = (taosGetTimestampUs() - st1) / 1000.0;
qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr); qDebug("generate group id map completed, elapsed time:%.2f ms %s", pTaskInfo->cost.groupIdMapTime, idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -321,7 +321,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S ...@@ -321,7 +321,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
} }
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (isAdd) { if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
...@@ -473,7 +473,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -473,7 +473,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
} }
...@@ -1027,10 +1027,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1027,10 +1027,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->dataReader == NULL) {
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) { &pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
ASSERT(0); ASSERT(0);
} }
} }
...@@ -1071,14 +1072,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1071,14 +1072,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
if (pTaskInfo->pTableInfoList == NULL) { if (pTaskInfo->pTableInfoList == NULL) {
pTaskInfo->pTableInfoList = tableListCreate(); pTaskInfo->pTableInfoList = tableListCreate();
} }
tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1); ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
......
...@@ -1062,15 +1062,18 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -1062,15 +1062,18 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond); int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; goto _error;
} }
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo); size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str); code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
}
} }
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
...@@ -1164,6 +1167,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU ...@@ -1164,6 +1167,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
return NULL; return NULL;
} }
...@@ -2418,8 +2422,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2418,8 +2422,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) { if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->dataReader = NULL; pTSInfo->dataReader = NULL;
if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL) < 0) { int32_t code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL);
terrno = TSDB_CODE_OUT_OF_MEMORY; if (code != 0) {
terrno = code;
destroyTableScanOperatorInfo(pTableScanOp); destroyTableScanOperatorInfo(pTableScanOp);
goto _error; goto _error;
} }
...@@ -4284,131 +4289,6 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* ...@@ -4284,131 +4289,6 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle,
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
STsdbReader** ppReader, const char* idstr) {
STsdbReader* pReader = NULL;
void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx);
int32_t num = tableEndIdx - tableStartIdx + 1;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr);
if (code != 0) {
return code;
}
*ppReader = pReader;
return TSDB_CODE_SUCCESS;
}
static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
uint64_t uid = pBlock->info.uid;
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
*status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL ||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
}
SDataBlockInfo* pBlockInfo = &pBlock->info;
taosMemoryFreeClear(pBlock->pBlockAgg);
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
// clear all data in pBlock that are set when handing the previous block
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
pcol->pData = NULL;
}
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
STsdbReader* reader = pTableScanInfo->pReader;
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
// todo create this buffer during creating operator
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
if (!pColMatchInfo->needOutput) {
continue;
}
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
}
return TSDB_CODE_SUCCESS;
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
}
}
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
STsdbReader* reader = pTableScanInfo->pReader;
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
if (pCols == NULL) {
return terrno;
}
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
int32_t code =
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
}
}
return TSDB_CODE_SUCCESS;
}
// todo refactor // todo refactor
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
...@@ -4535,7 +4415,7 @@ typedef struct STableMergeScanSortSourceParam { ...@@ -4535,7 +4415,7 @@ typedef struct STableMergeScanSortSourceParam {
SSDataBlock* inputBlock; SSDataBlock* inputBlock;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
static SSDataBlock* getTableDataBlockTemp(void* param) { static SSDataBlock* getTableDataBlockImpl(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator; SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
...@@ -4552,7 +4432,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { ...@@ -4552,7 +4432,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->readHandle; SReadHandle* pHandle = &pInfo->readHandle;
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
STsdbReader* reader = pInfo->pReader; STsdbReader* reader = pInfo->pReader;
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
...@@ -4603,55 +4487,6 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { ...@@ -4603,55 +4487,6 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
pInfo->pReader = NULL; pInfo->pReader = NULL;
return NULL; return NULL;
} }
static SSDataBlock* getTableDataBlock2(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
int64_t uid = source->uid;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
int64_t st = taosGetTimestampUs();
blockDataCleanup(pBlock);
STsdbReader* reader = pTableScanInfo->pReader;
while (tsdbTableNextDataBlock(reader, uid)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
continue;
}
blockDataCleanup(pBlock);
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
blockDataEnsureCapacity(pBlock, rows);
pBlock->info.rows = rows;
uint32_t status = 0;
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
}
return NULL;
}
static SSDataBlock* getTableDataBlock(void* param) { static SSDataBlock* getTableDataBlock(void* param) {
STableMergeScanSortSourceParam* source = param; STableMergeScanSortSourceParam* source = param;
...@@ -4761,7 +4596,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -4761,7 +4596,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockTemp, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block // one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1; int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
......
...@@ -122,14 +122,14 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { ...@@ -122,14 +122,14 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
taosGetCpuCores(&numCpuCores); taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2); snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2);
char pathTaosdLdLib[512] = {0}; char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) { if (ret != UV_ENOBUFS) {
taosdLdLibPathLen = strlen(pathTaosdLdLib); taosdLdLibPathLen = strlen(pathTaosdLdLib);
} }
char udfdPathLdLib[1024] = {0}; char udfdPathLdLib[1024] = {0};
size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath); size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen); strncpy(udfdPathLdLib, tsUdfdLdLibPath, udfdLdLibPathLen);
udfdPathLdLib[udfdLdLibPathLen] = ':'; udfdPathLdLib[udfdLdLibPathLen] = ':';
...@@ -362,7 +362,7 @@ typedef struct SUdfcProxy { ...@@ -362,7 +362,7 @@ typedef struct SUdfcProxy {
SArray *udfStubs; // SUdfcFuncStub SArray *udfStubs; // SUdfcFuncStub
uv_mutex_t udfcUvMutex; uv_mutex_t udfcUvMutex;
int8_t initialized; int8_t initialized;
} SUdfcProxy; } SUdfcProxy;
SUdfcProxy gUdfcProxy = {0}; SUdfcProxy gUdfcProxy = {0};
......
...@@ -127,7 +127,7 @@ static void uvFreeCb(uv_handle_t* handle); ...@@ -127,7 +127,7 @@ static void uvFreeCb(uv_handle_t* handle);
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg); static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg);
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); static int uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg); static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn); static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
...@@ -384,7 +384,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -384,7 +384,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
taosMemoryFree(req); taosMemoryFree(req);
} }
static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
STransMsg* pMsg = &smsg->msg; STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
...@@ -397,6 +397,13 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -397,6 +397,13 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
// handle invalid drop_task resp, TD-20098
if (pMsg->msgType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
transQueuePop(&pConn->srvMsgs);
destroySmsg(smsg);
return -1;
}
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType);
if (smsg->type == Release) pHead->msgType = 0; if (smsg->type == Release) pHead->msgType = 0;
...@@ -431,6 +438,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -431,6 +438,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb->base = (char*)pHead; wb->base = (char*)pHead;
wb->len = len; wb->len = len;
return 0;
} }
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
...@@ -440,7 +448,9 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { ...@@ -440,7 +448,9 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
} }
uv_buf_t wb; uv_buf_t wb;
uvPrepareSendData(smsg, &wb); if (uvPrepareSendData(smsg, &wb) < 0) {
return;
}
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
...@@ -451,8 +461,9 @@ static void uvStartSendResp(SSvrMsg* smsg) { ...@@ -451,8 +461,9 @@ static void uvStartSendResp(SSvrMsg* smsg) {
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
if (pConn->broken == true) { if (pConn->broken == true) {
// persist by // persist by
transFreeMsg(smsg->msg.pCont); destroySmsg(smsg);
taosMemoryFree(smsg); // transFreeMsg(smsg->msg.pCont);
// taosMemoryFree(smsg);
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
return; return;
} }
...@@ -748,10 +759,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -748,10 +759,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
transSockInfo2Str(&sockname, pConn->src); transSockInfo2Str(&sockname, pConn->src);
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
struct sockaddr_in addr = *(struct sockaddr_in*)&peername;
pConn->clientIp = addr.sin_addr.s_addr; pConn->clientIp = addr.sin_addr.s_addr;
pConn->port = ntohs(addr.sin_port); pConn->port = ntohs(addr.sin_port);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
} else { } else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册