提交 1427a8f3 编写于 作者: D dapan1121

enh: support client query policy

上级 86758572
......@@ -29,6 +29,13 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
typedef int32_t (*localFetchFp)(void *handle, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp);
typedef struct {
void *handle;
localFetchFp fp;
} SLocalFetch;
typedef struct {
void* tqReader;
void* meta;
......@@ -127,7 +134,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle
* @return
*/
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds);
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
/**
......
......@@ -393,6 +393,7 @@ typedef struct SDownstreamSourceNode {
uint64_t schedId;
int32_t execId;
int32_t fetchMsgType;
bool localExec;
} SDownstreamSourceNode;
typedef struct SExchangePhysiNode {
......
......@@ -56,6 +56,23 @@ typedef struct {
uint64_t numOfErrors;
} SQWorkerStat;
typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
} SQWMsgInfo;
typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
void *msg;
int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo;
} SQWMsg;
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
......@@ -78,10 +95,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
void qWorkerDestroy(void **qWorkerMgmt);
void qWorkerDestroy(void **qWorkerMgmt);
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg);
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp);
#ifdef __cplusplus
}
#endif
......
......@@ -365,7 +365,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
}
bool qnodeRequired(SRequestObj* pRequest) {
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
return false;
}
......@@ -689,7 +689,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.requestObjRefId = pRequest->self};
SSchedulerReq req = {
.syncReq = true,
.localReq = (tsQueryPolicy == CLIENT_HANDLE),
.localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
......@@ -1065,7 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {
.syncReq = false,
.localReq = (tsQueryPolicy == CLIENT_HANDLE),
.localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
......
......@@ -76,8 +76,6 @@ void taos_cleanup(void) {
cleanupTaskQueue();
qWorkerDestroy(&tscQueryMgmt);
taosConvDestroy();
tscInfo("all local resources released");
......
......@@ -285,7 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
......
......@@ -641,7 +641,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
while (1) {
uint64_t ts;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts);
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL);
if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
break;
......
......@@ -183,6 +183,7 @@ typedef struct SExecTaskInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
} SExecTaskInfo;
enum {
......
......@@ -450,10 +450,15 @@ static void freeBlock(void* param) {
blockDataDestroy(pBlock);
}
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
if (pLocal) {
pTaskInfo->localFetch.handle = pLocal->handle;
pTaskInfo->localFetch.fp = pLocal->fp;
}
taosArrayClearEx(pResList, freeBlock);
int64_t curOwner = 0;
......
......@@ -1999,38 +1999,45 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->execId = htonl(pSource->execId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback;
if (pSource->localExec) {
SDataBuf pBuf = {0};
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData);
loadRemoteDataCallback(pWrapper, &pBuf, code);
} else {
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->execId = htonl(pSource->execId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
}
return TSDB_CODE_SUCCESS;
}
......@@ -3963,7 +3970,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
if (pHandle->vnode) {
......@@ -4474,8 +4481,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
if (!pTaskInfo->localFetch.fp) {
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
}
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);
......
......@@ -609,6 +609,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_SCALAR_FIELD(schedId);
COPY_SCALAR_FIELD(execId);
COPY_SCALAR_FIELD(fetchMsgType);
COPY_SCALAR_FIELD(localExec);
return TSDB_CODE_SUCCESS;
}
......
......@@ -83,22 +83,6 @@ typedef struct SQWDebug {
extern SQWDebug gQWDebug;
typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
} SQWMsgInfo;
typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
void *msg;
int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo;
} SQWMsg;
typedef struct SQWHbParam {
bool inUse;
int32_t qwrId;
......@@ -133,6 +117,7 @@ typedef struct SQWTaskCtx {
int8_t taskType;
int8_t explain;
int8_t needFetch;
int8_t localExec;
int32_t msgType;
int32_t fetchType;
int32_t execId;
......@@ -150,6 +135,7 @@ typedef struct SQWTaskCtx {
int8_t events[QW_EVENT_MAX];
SArray *explainRes;
void *taskHandle;
void *sinkHandle;
STbVerInfo tbInfo;
......
......@@ -42,7 +42,7 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList);
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
......
......@@ -9,10 +9,10 @@
#include "tmsg.h"
#include "tname.h"
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......
......@@ -57,11 +57,15 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeItem);
QW_ERR_RET(code);
if (ctx->localExec) {
} else {
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeItem);
QW_ERR_RET(code);
}
}
if (!ctx->needFetch) {
......@@ -80,6 +84,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t execNum = 0;
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch};
SArray *pResList = taosArrayInit(4, POINTER_BYTES);
while (true) {
......@@ -88,7 +93,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
// if *taskHandle is NULL, it's killed right now
if (taskHandle) {
qwDbgSimulateSleep();
code = qExecTaskOpt(taskHandle, pResList, &useconds);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
......@@ -229,7 +234,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
if (NULL == rsp) {
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
......@@ -250,7 +255,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
*dataLen += len;
QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp));
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
output.pData = rsp->data + *dataLen - len;
code = dsGetDataBlock(ctx->sinkHandle, &output);
......@@ -474,16 +479,18 @@ _return:
}
if (QW_PHASE_POST_QUERY == phase && ctx) {
ctx->queryRsped = true;
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
if (!ctx->localExec) {
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
}
ctx->queryRsped = true;
}
if (ctx) {
......@@ -551,6 +558,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType;
ctx->localExec = false;
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -1034,7 +1042,11 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId;
mgmt->msgCb = pMsgCb ? *pMsgCb : NULL;
if (pMsgCb) {
mgmt->msgCb = *pMsgCb;
} else {
memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
}
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
if (mgmt->refId < 0) {
......@@ -1111,26 +1123,33 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) {
SQWorker *mgmt = (SQWorker*)pMgmt;
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
SSubplan *plan = (SSubplan *)qwMsg->msg;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SReadHandle rHandle = {0};
QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType;
ctx->localExec = true;
ctx->explainRes = explainRes;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
......@@ -1149,6 +1168,12 @@ int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
_return:
taosMemoryFree(rHandle.pMsgCb);
input.code = code;
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
qwReleaseTaskCtx(mgmt, ctx);
......@@ -1157,4 +1182,52 @@ _return:
QW_RET(code);
}
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) {
SQWorker *mgmt = (SQWorker*)pMgmt;
int32_t code = 0;
int32_t dataLen = 0;
SQWTaskCtx *ctx = NULL;
void *rsp = NULL;
bool queryStop = false;
SQWPhaseInput input = {0};
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->msgType = TDMT_SCH_MERGE_FETCH;
ctx->explainRes = explainRes;
SOutputData sOutput = {0};
while (true) {
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
continue;
} else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
}
break;
}
}
_return:
*pRsp = rsp;
input.code = code;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
QW_RET(code);
}
......@@ -9,7 +9,7 @@ target_include_directories(
target_link_libraries(
scheduler
PUBLIC os util nodes planner qcom common catalog transport command
PUBLIC os util nodes planner qcom common catalog transport command qworker executor
)
if(${BUILD_TEST})
......
......@@ -264,7 +264,7 @@ typedef struct SSchJob {
SHashObj *taskList;
SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx;
int8_t status;
SQueryNodeAddr resNode;
......@@ -305,6 +305,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && SCH_IS_DATA_MERGE_TASK(_task))
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
......@@ -503,6 +504,7 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
extern SSchDebug gSCHDebug;
......
......@@ -72,6 +72,71 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
int32_t code = 0;
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schLaunchFetchTask(pJob));
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
if (pJob->fetchRes) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
msg = NULL;
schProcessOnDataFetched(pJob);
_return:
taosMemoryFreeClear(msg);
SCH_RET(code);
}
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
......@@ -305,61 +370,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
break;
}
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schLaunchFetchTask(pJob));
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
if (pJob->fetchRes) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
taosMemoryFreeClear(rsp);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
code = schProcessFetchRsp(pJob, pTask, msg, rspCode);
msg = NULL;
schProcessOnDataFetched(pJob);
SCH_ERR_JRET(code);
break;
}
case TDMT_SCH_DROP_TASK_RSP: {
......
......@@ -20,6 +20,7 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "qworker.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
......@@ -89,6 +90,10 @@ _return:
}
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
return TSDB_CODE_SUCCESS;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
......@@ -292,6 +297,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.execId = pTask->execId,
.addr = pTask->succeedAddr,
.fetchMsgType = SCH_FETCH_TYPE(pTask),
.localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
......@@ -847,15 +853,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
if (NULL == schMgmt.queryMgmt) {
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
}
SArray *explainRes = NULL;
SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
qwMsg.msg = pTask->plan;
qwMsg.msgType = pTask->plan->msgType;
SCH_ERR_RET(qWorkerProcessLocalQuery((SQWorker*)schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg));
qwMsg.connInfo.handle = pJob->conn.pTrans;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES);
}
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes));
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
}
......@@ -878,7 +890,8 @@ int32_t schLaunchTaskImpl(void *param) {
pTask->retryTimes++;
pTask->waitRetry = false;
SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE",
pTask->execId, pTask->retryTimes);
SCH_LOG_TASK_START_TS(pTask);
......@@ -893,7 +906,7 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
if (pJob->attr.localExec && SCH_IS_QUERY_JOB(pJob) && SCH_IS_DATA_MERGE_TASK(pTask)) {
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
} else {
SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
......@@ -986,6 +999,25 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
}
}
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
}
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
void *pRsp = NULL;
SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES);
}
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes));
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
return TSDB_CODE_SUCCESS;
}
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
int32_t code = 0;
......@@ -996,7 +1028,11 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
} else {
SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
}
return TSDB_CODE_SUCCESS;
......
......@@ -17,6 +17,7 @@
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "qworker.h"
SSchedulerMgmt schMgmt = {
.jobRef = -1,
......@@ -192,4 +193,7 @@ void schedulerDestroy(void) {
schMgmt.hbConnections = NULL;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
qWorkerDestroy(&schMgmt.queryMgmt);
schMgmt.queryMgmt = NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册