未验证 提交 0df311a7 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17264 from taosdata/refact/query_opt

refactor:do some internal refactor to optimize the query performance.
......@@ -93,6 +93,7 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
// query client
extern int32_t tsQueryPolicy;
extern int32_t tsQueryRspPolicy;
extern int32_t tsQuerySmaOptimize;
extern int32_t tsQueryRsmaTolerance;
extern bool tsQueryPlannerTrace;
......
......@@ -54,6 +54,9 @@ typedef enum {
#define QUERY_POLICY_QNODE 3
#define QUERY_POLICY_CLIENT 4
#define QUERY_RSP_POLICY_DELAY 0
#define QUERY_RSP_POLICY_QUICK 1
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
......
......@@ -1036,30 +1036,38 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
pRequest->type = pQuery->msgType;
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
SPlanContext cxt = {.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user,
.sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SQueryPlan* pDag = NULL;
int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
if (code) {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else {
pRequest->body.subplanNum = pDag->numOfSubplans;
}
pRequest->metric.planEnd = taosGetTimestampUs();
SPlanContext cxt = {.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user,
.sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SQueryPlan* pDag = NULL;
int64_t st = taosGetTimestampUs();
int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
if (code) {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} else {
pRequest->body.subplanNum = pDag->numOfSubplans;
}
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
pRequest->metric.planEnd = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self,
(pRequest->metric.planEnd - st)/1000.0, pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL;
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
.requestId = pRequest->requestId,
......
......@@ -52,15 +52,15 @@ void printResult(TAOS_RES* pRes) {
int32_t n = 0;
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
for (int32_t i = 0; i < numOfFields; ++i) {
printf("(%d):%d ", i, length[i]);
}
printf("\n");
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
memset(str, 0, sizeof(str));
// int32_t* length = taos_fetch_lengths(pRes);
// for(int32_t i = 0; i < numOfFields; ++i) {
// printf("(%d):%d " , i, length[i]);
// }
// printf("\n");
//
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// memset(str, 0, sizeof(str));
}
}
......@@ -102,17 +102,6 @@ void queryCallback(void* param, void* res, int32_t code) {
taos_fetch_raw_block_a(res, fetchCallback, param);
}
void queryCallback1(void* param, void* res, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
printf("failed to execute, reason:%s\n", taos_errstr(res));
}
taos_free_result(res);
printf("exec query:\n");
taos_query_a(param, "select * from tm1", queryCallback, param);
}
void createNewTable(TAOS* pConn, int32_t index) {
char str[1024] = {0};
sprintf(str, "create table tu%d using st2 tags(%d)", index, index);
......@@ -123,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result(pRes);
for (int32_t i = 0; i < 3280; i += 20) {
for(int32_t i = 0; i < 10000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......@@ -141,10 +130,49 @@ void createNewTable(TAOS* pConn, int32_t index) {
taos_free_result(p);
}
}
void *queryThread(void *arg) {
TAOS* pConn = taos_connect("192.168.0.209", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
printf("failed to connect to db, reason:%s", taos_errstr(pConn));
return NULL;
}
int64_t el = 0;
for (int32_t i = 0; i < 5000000; ++i) {
int64_t st = taosGetTimestampUs();
TAOS_RES* pRes = taos_query(pConn,
"SELECT _wstart as ts,max(usage_user) FROM benchmarkcpu.host_49 WHERE ts >= 1451618560000 AND ts < 1451622160000 INTERVAL(1m) ;");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
} else {
printResult(pRes);
}
taos_free_result(pRes);
el += (taosGetTimestampUs() - st);
if (i % 1000 == 0 && i != 0) {
printf("total:%d, avg time:%.2fms\n", i, el/(double)(i*1000));
}
}
taos_close(pConn);
return NULL;
}
static int32_t numOfThreads = 1;
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
if (argc > 1) {
numOfThreads = atoi(argv[1]);
}
numOfThreads = TMAX(numOfThreads, 1);
printf("the runing threads is:%d", numOfThreads);
return RUN_ALL_TESTS();
}
......@@ -664,7 +692,6 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -676,13 +703,14 @@ TEST(testCase, projection_query_tables) {
// }
// taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_RES* pRes = taos_query(pConn, "use benchmarkcpu");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
......@@ -722,6 +750,16 @@ TEST(testCase, projection_query_tables) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, tsbs_perf_test) {
TdThread qid[20] = {0};
for(int32_t i = 0; i < numOfThreads; ++i) {
taosThreadCreate(&qid[i], NULL, queryThread, NULL);
}
getchar();
}
#if 0
TEST(testCase, projection_query_stables) {
......
......@@ -90,6 +90,7 @@ bool tsSmlDataFormat = false;
// query
int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0;
int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false;
......@@ -350,6 +351,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
......@@ -728,6 +730,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32;
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32;
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
......
......@@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
mndPostProcessQueryMsg(pMsg);
}
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code));
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......
......@@ -543,6 +543,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
ASSERT(pMTree->pLoadInfo != NULL);
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter *pIter = NULL;
......
......@@ -3395,19 +3395,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err;
}
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
code = doOpenReaderImpl(pReader);
if (numOfTables > 0) {
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _err;
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pNextReader = pReader->innerReader[1];
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
STsdbReader* pPrevReader = pReader->innerReader[0];
STsdbReader* pNextReader = pReader->innerReader[1];
// we need only one row
pPrevReader->capacity = 1;
......@@ -3422,19 +3423,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
pNextReader->pMemSchema = pReader->pMemSchema;
pNextReader->pReadSnap = pReader->pReadSnap;
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pNextReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pNextReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
code = doOpenReaderImpl(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
......@@ -3513,6 +3515,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pLReader);
}
if (pReader->innerReader[0] != 0) {
tsdbUntakeReadSnap(pReader->innerReader[0]->pTsdb, pReader->innerReader[0]->pReadSnap, pReader->idStr);
}
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64
......
......@@ -212,6 +212,7 @@ typedef struct SExprSupp {
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
SFilterInfo* pFilterInfo;
} SExprSupp;
typedef struct SOperatorInfo {
......@@ -926,7 +927,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, const char* idStr);
......
......@@ -1113,15 +1113,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) {
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) {
if (pFilterNode == NULL || pBlock->info.rows == 0) {
return;
}
SFilterInfo* filter = NULL;
SFilterInfo* filter = pFilterInfo;
int64_t st = taosGetTimestampUs();
// pError("start filter");
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
int32_t code = 0;
bool needFree = false;
if (filter == NULL) {
needFree = true;
code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
......@@ -1130,7 +1139,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
// todo the keep seems never to be True??
bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
filterFreeInfo(filter);
if (needFree) {
filterFreeInfo(filter);
}
extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
......@@ -2479,7 +2491,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......@@ -2873,7 +2885,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break;
}
doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo);
doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL);
if (fillResult->info.rows > 0) {
break;
}
......@@ -3049,6 +3061,12 @@ void cleanupExprSupp(SExprSupp* pSupp) {
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
taosMemoryFreeClear(pSupp->pExprInfo);
}
if (pSupp->pFilterInfo != NULL) {
filterFreeInfo(pSupp->pFilterInfo);
pSupp->pFilterInfo = NULL;
}
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
......
......@@ -311,7 +311,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL);
doFilter(pInfo->pCondition, pRes, NULL, NULL);
if (!hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......
......@@ -387,7 +387,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break;
}
if (pJoinInfo->pCondAfterMerge != NULL) {
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL);
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
......
......@@ -315,7 +315,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
// do apply filter
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL);
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL);
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
......@@ -325,7 +325,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} else {
// do apply filter
if (pRes->info.rows > 0) {
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
if (pRes->info.rows == 0) {
continue;
}
......@@ -518,7 +518,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL);
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL);
size_t rows = pInfo->pRes->info.rows;
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
break;
......@@ -620,7 +620,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
}
pRes->info.rows = 1;
doFilter(pProjectInfo->pFilterNode, pRes, NULL);
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
/*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);
......
......@@ -385,7 +385,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
......@@ -754,6 +754,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
}
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->currentGroupId = -1;
......@@ -1123,7 +1128,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return NULL;
}
doFilter(pInfo->pCondition, pResult, NULL);
doFilter(pInfo->pCondition, pResult, NULL, NULL);
if (pResult->info.rows == 0) {
continue;
}
......@@ -1474,7 +1479,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
if (filter) {
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
}
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
......@@ -1896,7 +1901,7 @@ FETCH_NEXT_BLOCK:
}
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
......@@ -2382,7 +2387,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
}
......@@ -3455,7 +3460,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
......
......@@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return NULL;
}
doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo);
doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}
......
......@@ -1501,7 +1501,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
doStreamFillImpl(pOperator);
doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo);
doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL);
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
if (pInfo->pRes->info.rows > 0) {
break;
......
......@@ -1277,7 +1277,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
......@@ -1315,7 +1315,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBlock, NULL);
doFilter(pInfo->pCondition, pBlock, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
......@@ -2019,7 +2019,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
......@@ -2062,7 +2062,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
......@@ -5251,7 +5251,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
doFilter(pMiaInfo->pCondition, pRes, NULL);
doFilter(pMiaInfo->pCondition, pRes, NULL, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break;
}
......
......@@ -123,6 +123,7 @@ typedef struct SQWTaskCtx {
int32_t execId;
int32_t level;
bool queryGotData;
bool queryRsped;
bool queryEnd;
bool queryContinue;
......
......@@ -10,6 +10,7 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "tname.h"
#include "tglobal.h"
SQWorkerMgmt gQwMgmt = {
.lock = 0,
......@@ -92,6 +93,19 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS;
}
int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) {
if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) {
if (!ctx->localExec) {
qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode));
}
ctx->queryRsped = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t code = 0;
bool qcontinue = true;
......@@ -144,7 +158,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (numOfResBlock == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
} else {
QW_TASK_DLOG("qExecTask done", "");
QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds);
}
dsEndPut(sinkHandle, useconds);
......@@ -234,6 +248,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
int32_t code = 0;
SOutputData output = {0};
if (NULL == ctx->sinkHandle) {
return TSDB_CODE_SUCCESS;
}
*dataLen = 0;
while (true) {
......@@ -407,6 +425,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase));
QW_ERR_JRET(ctx->rspCode);
}
if (!ctx->queryRsped) {
QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
......@@ -419,6 +442,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase));
QW_ERR_JRET(ctx->rspCode);
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
......@@ -499,21 +527,17 @@ _return:
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
ctx->queryGotData = true;
}
if (QW_PHASE_POST_QUERY == phase && ctx) {
if (!ctx->localExec) {
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->queryRsped) {
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
}
ctx->queryRsped = true;
}
if (ctx) {
......@@ -551,6 +575,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
ctx->ctrlConnInfo = qwMsg->connInfo;
ctx->phase = -1;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
......@@ -604,6 +629,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
......@@ -619,6 +646,31 @@ _return:
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
void *rsp = NULL;
int32_t dataLen = 0;
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (rsp) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
}
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
tstrerror(code), dataLen);
}
}
QW_RET(TSDB_CODE_SUCCESS);
}
......@@ -740,7 +792,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = true;
// RC WARNING
if (QW_QUERY_RUNNING(ctx)) {
if (-1 == ctx->phase || false == ctx->queryGotData) {
QW_TASK_DLOG_E("task query unfinished");
} else if (QW_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
......
......@@ -913,7 +913,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
// SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
......@@ -993,6 +993,12 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
}
#if 0
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
}
#endif
_return:
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
......
......@@ -155,8 +155,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
return;
}
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", errCode:0x%x", *jobId, errCode);
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode));
schHandleJobDrop(pJob, errCode);
schReleaseJob(*jobId);
......
......@@ -597,6 +597,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
transAllocBuffer(pBuf, buf);
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
......@@ -609,7 +610,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if (nread > 0) {
pBuf->len += nread;
while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
if (pBuf->invalid) {
cliHandleExcept(conn);
break;
......
......@@ -203,7 +203,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
}
int transSetConnOption(uv_tcp_t* stream) {
uv_tcp_nodelay(stream, 1);
uv_tcp_nodelay(stream, 0);
int ret = uv_tcp_keepalive(stream, 5, 60);
return ret;
}
......
......@@ -332,7 +332,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
if (status == 0) {
tTrace("conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) {
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
STraceId* trace = &msg->msg.info.traceId;
tGDebug("conn %p write data out", conn);
destroySmsg(msg);
// send cached data
if (!transQueueEmpty(&conn->srvMsgs)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册