提交 74be3572 编写于 作者: D dapan1121

enh: add query response policy

上级 b7e897ab
...@@ -93,6 +93,7 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in ...@@ -93,6 +93,7 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
// query client // query client
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQueryRspPolicy;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;
extern int32_t tsQueryRsmaTolerance; extern int32_t tsQueryRsmaTolerance;
extern bool tsQueryPlannerTrace; extern bool tsQueryPlannerTrace;
......
...@@ -54,6 +54,9 @@ typedef enum { ...@@ -54,6 +54,9 @@ typedef enum {
#define QUERY_POLICY_QNODE 3 #define QUERY_POLICY_QNODE 3
#define QUERY_POLICY_CLIENT 4 #define QUERY_POLICY_CLIENT 4
#define QUERY_RSP_POLICY_DELAY 0
#define QUERY_RSP_POLICY_QUICK 1
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
......
...@@ -89,6 +89,7 @@ bool tsSmlDataFormat = false; // true means that the name and order of cols in ...@@ -89,6 +89,7 @@ bool tsSmlDataFormat = false; // true means that the name and order of cols in
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0;
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
...@@ -345,6 +346,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -345,6 +346,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1; if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
...@@ -721,6 +723,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -721,6 +723,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32;
tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32; tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32;
tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval; tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval;
tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32;
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
......
...@@ -3021,6 +3021,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { ...@@ -3021,6 +3021,8 @@ void cleanupExprSupp(SExprSupp* pSupp) {
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
taosMemoryFreeClear(pSupp->pExprInfo); taosMemoryFreeClear(pSupp->pExprInfo);
} }
filterFreeInfo(pSupp->pFilterInfo);
taosMemoryFree(pSupp->rowEntryInfoOffset); taosMemoryFree(pSupp->rowEntryInfoOffset);
} }
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h"
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
...@@ -92,6 +93,16 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -92,6 +93,16 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) {
if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) {
qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx);
ctx->queryRsped = true;
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode));
}
return TSDB_CODE_SUCCESS;
}
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
...@@ -411,6 +422,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -411,6 +422,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
} }
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase));
QW_ERR_JRET(ctx->rspCode);
}
if (!ctx->queryRsped) { if (!ctx->queryRsped) {
QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase)); QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
...@@ -423,6 +439,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -423,6 +439,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase));
QW_ERR_JRET(ctx->rspCode);
}
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
...@@ -506,22 +527,15 @@ _return: ...@@ -506,22 +527,15 @@ _return:
ctx->queryGotData = true; ctx->queryGotData = true;
} }
#if 0 if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->localExec && !ctx->queryRsped) {
if (QW_PHASE_POST_QUERY == phase && ctx) { bool rsped = false;
if (!ctx->localExec) { SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
bool rsped = false; qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); if (!rsped) {
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
} }
ctx->queryRsped = true;
} }
#endif
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
...@@ -558,20 +572,12 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -558,20 +572,12 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
ctx->ctrlConnInfo = qwMsg->connInfo; ctx->ctrlConnInfo = qwMsg->connInfo;
ctx->phase = -1;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
_return: _return:
#if 1
qwBuildAndSendQueryRsp(qwMsg->msgType + 1, &qwMsg->connInfo, code, ctx);
if (ctx) {
ctx->queryRsped = true;
ctx->phase = -1;
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
#endif
if (ctx) { if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
...@@ -620,6 +626,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ...@@ -620,6 +626,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
ctx->level = plan->level; ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle); atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
...@@ -660,7 +668,7 @@ _return: ...@@ -660,7 +668,7 @@ _return:
} }
} }
QW_RET(code); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册