未验证 提交 983f4797 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16862 from taosdata/enh/clientPolicy

enh: support client merge policy
......@@ -1424,6 +1424,14 @@ typedef struct {
SExplainExecInfo* subplanInfo;
} SExplainRsp;
typedef struct {
SExplainRsp rsp;
uint64_t qId;
uint64_t tId;
int64_t rId;
int32_t eId;
} SExplainLocalRsp;
typedef struct STableScanAnalyzeInfo {
uint64_t totalRows;
uint64_t totalCheckedRows;
......@@ -1438,6 +1446,7 @@ typedef struct STableScanAnalyzeInfo {
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
void tFreeSExplainRsp(SExplainRsp *pRsp);
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
......
......@@ -29,6 +29,15 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
typedef struct {
void *handle;
bool localExec;
localFetchFp fp;
SArray *explainRes;
} SLocalFetch;
typedef struct {
void* tqReader;
void* meta;
......@@ -127,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle
* @return
*/
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds);
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
/**
......
......@@ -396,6 +396,7 @@ typedef struct SDownstreamSourceNode {
uint64_t schedId;
int32_t execId;
int32_t fetchMsgType;
bool localExec;
} SDownstreamSourceNode;
typedef struct SExchangePhysiNode {
......
......@@ -52,6 +52,7 @@ typedef enum {
#define QUERY_POLICY_VNODE 1
#define QUERY_POLICY_HYBRID 2
#define QUERY_POLICY_QNODE 3
#define QUERY_POLICY_CLIENT 4
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
......@@ -269,43 +270,43 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qError(...) \
do { \
if (qDebugFlag & DEBUG_ERROR) { \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qWarn(...) \
do { \
if (qDebugFlag & DEBUG_WARN) { \
taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qInfo(...) \
do { \
if (qDebugFlag & DEBUG_INFO) { \
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
......
......@@ -29,6 +29,7 @@ enum {
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
NODE_TYPE_MNODE,
NODE_TYPE_CLIENT,
};
typedef struct SQWorkerCfg {
......@@ -55,7 +56,24 @@ typedef struct {
uint64_t numOfErrors;
} SQWorkerStat;
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
} SQWMsgInfo;
typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
void *msg;
int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo;
} SQWMsg;
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
......@@ -77,10 +95,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
void qWorkerDestroy(void **qWorkerMgmt);
void qWorkerDestroy(void **qWorkerMgmt);
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes);
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes);
#ifdef __cplusplus
}
#endif
......
......@@ -64,6 +64,7 @@ typedef bool (*schedulerChkKillFp)(void* param);
typedef struct SSchedulerReq {
bool syncReq;
bool localReq;
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
......
......@@ -483,6 +483,7 @@ enum {
#define SNODE_HANDLE -2
#define VNODE_HANDLE -3
#define BNODE_HANDLE -4
#define CLIENT_HANDLE -5
#define TSDB_CONFIG_OPTION_LEN 32
#define TSDB_CONFIG_VALUE_LEN 64
......
......@@ -27,6 +27,7 @@
#include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "qworker.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......
......@@ -379,7 +379,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
}
bool qnodeRequired(SRequestObj* pRequest) {
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
return false;
}
......@@ -483,7 +483,8 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
char *policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
int32_t dbNum = taosArrayGetSize(pDbVgList);
for (int32_t i = 0; i < dbNum; ++i) {
SArray* pVg = taosArrayGetP(pDbVgList, i);
......@@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
int32_t vnodeNum = taosArrayGetSize(nodeList);
if (vnodeNum > 0) {
tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum);
tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
goto _return;
}
int32_t mnodeNum = taosArrayGetSize(pMnodeList);
if (mnodeNum <= 0) {
tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId);
tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
goto _return;
}
void* pData = taosArrayGet(pMnodeList, 0);
taosArrayAddBatch(nodeList, pData, mnodeNum);
tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
_return:
......@@ -561,7 +562,8 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
int32_t code = 0;
switch (tsQueryPolicy) {
case QUERY_POLICY_VNODE: {
case QUERY_POLICY_VNODE:
case QUERY_POLICY_CLIENT: {
if (pResultMeta) {
pDbVgList = taosArrayInit(4, POINTER_BYTES);
......@@ -622,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
int32_t code = 0;
switch (tsQueryPolicy) {
case QUERY_POLICY_VNODE: {
case QUERY_POLICY_VNODE:
case QUERY_POLICY_CLIENT: {
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
if (dbNum > 0) {
SCatalog* pCtg = NULL;
......@@ -682,6 +685,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.requestObjRefId = pRequest->self};
SSchedulerReq req = {
.syncReq = true,
.localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
......@@ -1061,6 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {
.syncReq = false,
.localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
......
......@@ -26,6 +26,7 @@
#include "tref.h"
#include "trpc.h"
#include "version.h"
#include "qworker.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
......
......@@ -285,7 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
......
......@@ -4365,7 +4365,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1;
if (pRsp->numOfPlans > 0) {
pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo));
pRsp->subplanInfo = taosMemoryCalloc(pRsp->numOfPlans, sizeof(SExplainExecInfo));
if (pRsp->subplanInfo == NULL) return -1;
}
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
......@@ -4373,7 +4373,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t **)&pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0)
if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0)
return -1;
}
......@@ -4383,6 +4383,19 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
return 0;
}
void tFreeSExplainRsp(SExplainRsp *pRsp) {
if (NULL == pRsp) {
return;
}
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
SExplainExecInfo *pExec = pRsp->subplanInfo + i;
taosMemoryFree(pExec->verboseInfo);
}
taosMemoryFreeClear(pRsp->subplanInfo);
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......
......@@ -170,7 +170,7 @@ _exit:
}
int32_t mndInitQuery(SMnode *pMnode) {
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
mError("failed to init qworker in mnode since %s", terrstr());
return -1;
}
......
......@@ -26,7 +26,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return NULL;
}
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) {
taosMemoryFreeClear(pQnode);
return NULL;
}
......
......@@ -690,7 +690,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
while (1) {
uint64_t ts;
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts);
int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL);
if (code < 0) {
if (code == TSDB_CODE_QRY_IN_EXEC) {
break;
......
......@@ -16,7 +16,7 @@
#include "vnd.h"
int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
}
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
......
......@@ -99,8 +99,10 @@ extern "C" {
typedef struct SExplainGroup {
int32_t nodeNum;
int32_t nodeIdx;
int32_t physiPlanExecNum;
int32_t physiPlanExecIdx;
bool singleChannel;
SRWLatch lock;
SSubplan *plan;
SArray *nodeExecInfo; //Array<SExplainRsp>
......
......@@ -21,14 +21,14 @@
#include "tdatablock.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
void qExplainFreeResNode(SExplainResNode *resNode) {
if (NULL == resNode) {
return;
}
taosMemoryFreeClear(resNode->pExecInfo);
taosArrayDestroy(resNode->pExecInfo);
SNode *node = NULL;
FOREACH(node, resNode->pChildren) { qExplainFreeResNode((SExplainResNode *)node); }
......@@ -56,8 +56,9 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
int32_t num = taosArrayGetSize(group->nodeExecInfo);
for (int32_t i = 0; i < num; ++i) {
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
taosMemoryFreeClear(rsp->subplanInfo);
tFreeSExplainRsp(rsp);
}
taosArrayDestroy(group->nodeExecInfo);
}
pIter = taosHashIterate(pCtx->groupHash, pIter);
......@@ -66,6 +67,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
taosHashCleanup(pCtx->groupHash);
taosArrayDestroy(pCtx->rows);
taosMemoryFreeClear(pCtx->tbuf);
taosMemoryFree(pCtx);
}
......@@ -248,7 +250,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
return TSDB_CODE_SUCCESS;
}
int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group) {
int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) {
*pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo));
if (NULL == (*pExecInfo)) {
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
......@@ -256,17 +258,28 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
}
SExplainRsp *rsp = NULL;
for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i);
/*
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR;
}
if (group->singleChannel) {
if (0 == group->physiPlanExecIdx) {
group->nodeIdx = 0;
}
rsp = taosArrayGet(group->nodeExecInfo, group->nodeIdx++);
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
} else {
for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i);
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
*/
taosArrayPush(*pExecInfo, rsp->subplanInfo);
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
}
}
++group->physiPlanExecIdx;
......@@ -291,7 +304,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai
resNode->pNode = pNode;
if (group->nodeExecInfo) {
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group));
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(pNode, &resNode->pExecInfo, group));
}
QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren));
......@@ -801,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1));
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel));
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
......@@ -1533,7 +1546,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32
return TSDB_CODE_SUCCESS;
}
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) {
SExplainResNode *node = NULL;
int32_t code = 0;
SExplainCtx *ctx = (SExplainCtx *)pCtx;
......@@ -1544,6 +1557,9 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
group->singleChannel = singleChannel;
group->physiPlanExecIdx = 0;
QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node));
QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level));
......@@ -1707,7 +1723,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
}
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0, false));
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
......@@ -1723,7 +1739,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId));
if (NULL == group) {
qError("group %d not in groupHash", groupId);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -1732,7 +1748,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp));
if (NULL == group->nodeExecInfo) {
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
taosWUnLockLatch(&group->lock);
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1742,7 +1758,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
} else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) {
qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo),
group->nodeNum);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
taosWUnLockLatch(&group->lock);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -1751,13 +1767,14 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
if (group->physiPlanExecNum != pRspMsg->numOfPlans) {
qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum,
groupId);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
taosWUnLockLatch(&group->lock);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
taosArrayPush(group->nodeExecInfo, pRspMsg);
groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum);
taosWUnLockLatch(&group->lock);
......
......@@ -183,6 +183,7 @@ typedef struct SExecTaskInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
} SExecTaskInfo;
enum {
......
......@@ -479,10 +479,14 @@ static void freeBlock(void* param) {
blockDataDestroy(pBlock);
}
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
if (pLocal) {
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
}
taosArrayClearEx(pResList, freeBlock);
int64_t curOwner = 0;
......
......@@ -1776,49 +1776,57 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->execId = htonl(pSource->execId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback;
if (pSource->localExec) {
SDataBuf pBuf = {0};
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
loadRemoteDataCallback(pWrapper, &pBuf, code);
taosMemoryFree(pWrapper);
} else {
SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) {
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId,
sourceIndex, totalSources);
pMsg->header.vgId = htonl(pSource->addr.nodeId);
pMsg->sId = htobe64(pSource->schedId);
pMsg->taskId = htobe64(pSource->taskId);
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
pMsg->execId = htonl(pSource->execId);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(pMsg);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return pTaskInfo->code;
}
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
}
return TSDB_CODE_SUCCESS;
}
......@@ -3531,7 +3539,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
if (pHandle->vnode) {
......@@ -4047,8 +4055,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
if (!pTaskInfo->localFetch.localExec) {
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
}
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);
......
......@@ -627,6 +627,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_SCALAR_FIELD(schedId);
COPY_SCALAR_FIELD(execId);
COPY_SCALAR_FIELD(fetchMsgType);
COPY_SCALAR_FIELD(localExec);
return TSDB_CODE_SUCCESS;
}
......
......@@ -29,7 +29,7 @@ extern "C" {
#include "executor.h"
#include "trpc.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_SCHEDULER_NUMBER 100
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
......@@ -83,22 +83,6 @@ typedef struct SQWDebug {
extern SQWDebug gQWDebug;
typedef struct SQWMsgInfo {
int8_t taskType;
int8_t explain;
int8_t needFetch;
} SQWMsgInfo;
typedef struct SQWMsg {
void *node;
int32_t code;
int32_t msgType;
char *msg;
int32_t msgLen;
SQWMsgInfo msgInfo;
SRpcHandleInfo connInfo;
} SQWMsg;
typedef struct SQWHbParam {
bool inUse;
int32_t qwrId;
......@@ -133,6 +117,7 @@ typedef struct SQWTaskCtx {
int8_t taskType;
int8_t explain;
int8_t needFetch;
int8_t localExec;
int32_t msgType;
int32_t fetchType;
int32_t execId;
......@@ -150,6 +135,7 @@ typedef struct SQWTaskCtx {
int8_t events[QW_EVENT_MAX];
SArray *explainRes;
void *taskHandle;
void *sinkHandle;
STbVerInfo tbInfo;
......
......@@ -42,7 +42,7 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList);
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
......
......@@ -9,10 +9,10 @@
#include "tmsg.h"
#include "tname.h"
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
if (NULL == pRsp) {
qError("rpcMallocCont %d failed", msgSize);
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......
......@@ -63,11 +63,25 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeItem);
QW_ERR_RET(code);
if (ctx->localExec) {
SExplainLocalRsp localRsp = {0};
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
localRsp.rsp.subplanInfo = pExec;
localRsp.qId = qId;
localRsp.tId = tId;
localRsp.rId = rId;
localRsp.eId = eId;
taosArrayPush(ctx->explainRes, &localRsp);
taosArrayDestroy(execInfoList);
} else {
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
taosArrayDestroyEx(execInfoList, freeItem);
QW_ERR_RET(code);
}
}
if (!ctx->needFetch) {
......@@ -86,6 +100,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t execNum = 0;
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes};
SArray *pResList = taosArrayInit(4, POINTER_BYTES);
while (true) {
......@@ -94,8 +109,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
// if *taskHandle is NULL, it's killed right now
if (taskHandle) {
qwDbgSimulateSleep();
code = qExecTaskOpt(taskHandle, pResList, &useconds);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch);
if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
......@@ -235,7 +249,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
if (NULL == rsp) {
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
......@@ -256,7 +270,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
*dataLen += len;
QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp));
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp));
output.pData = rsp->data + *dataLen - len;
code = dsGetDataBlock(ctx->sinkHandle, &output);
......@@ -480,16 +494,18 @@ _return:
}
if (QW_PHASE_POST_QUERY == phase && ctx) {
ctx->queryRsped = true;
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
if (!ctx->localExec) {
bool rsped = false;
SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo};
qwDbgSimulateRedirect(&qwMsg, ctx, &rsped);
qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped);
if (!rsped) {
qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
}
}
ctx->queryRsped = true;
}
if (ctx) {
......@@ -518,11 +534,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) {
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
......@@ -562,6 +573,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType;
ctx->localExec = false;
// QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -584,19 +596,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
// QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
// queryRsped = true;
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) {
qwSaveTbVersionInfo(pTaskInfo, ctx);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
}
qwSaveTbVersionInfo(pTaskInfo, ctx);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
_return:
......@@ -606,11 +611,6 @@ _return:
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
// if (!queryRsped) {
// qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
// QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
//}
QW_RET(TSDB_CODE_SUCCESS);
}
......@@ -1006,8 +1006,8 @@ _return:
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) {
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1030,22 +1030,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (cfg) {
mgmt->cfg = *cfg;
if (0 == mgmt->cfg.maxSchedulerNum) {
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
}
if (0 == mgmt->cfg.maxTaskNum) {
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
}
if (0 == mgmt->cfg.maxSchTaskNum) {
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
}
} else {
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER;
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
HASH_ENTRY_LOCK);
......@@ -1070,7 +1057,11 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId;
mgmt->msgCb = *pMsgCb;
if (pMsgCb) {
mgmt->msgCb = *pMsgCb;
} else {
memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb));
}
mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt);
if (mgmt->refId < 0) {
......@@ -1153,3 +1144,112 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) {
SQWorker *mgmt = (SQWorker*)pMgmt;
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
SSubplan *plan = (SSubplan *)qwMsg->msg;
SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
SReadHandle rHandle = {0};
QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS()));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
ctx->taskType = qwMsg->msgInfo.taskType;
ctx->explain = qwMsg->msgInfo.explain;
ctx->needFetch = qwMsg->msgInfo.needFetch;
ctx->msgType = qwMsg->msgType;
ctx->localExec = true;
ctx->explainRes = explainRes;
rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb));
rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle;
code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
if (NULL == sinkHandle || NULL == pTaskInfo) {
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
ctx->level = plan->level;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
_return:
taosMemoryFree(rHandle.pMsgCb);
input.code = code;
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (ctx) {
QW_UPDATE_RSP_CODE(ctx, code);
qwReleaseTaskCtx(mgmt, ctx);
}
QW_RET(code);
}
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) {
SQWorker *mgmt = (SQWorker*)pMgmt;
int32_t code = 0;
int32_t dataLen = 0;
SQWTaskCtx *ctx = NULL;
void *rsp = NULL;
bool queryStop = false;
SQWPhaseInput input = {0};
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL));
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->msgType = TDMT_SCH_MERGE_FETCH;
ctx->explainRes = explainRes;
SOutputData sOutput = {0};
while (true) {
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
continue;
} else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
if (qComplete) {
atomic_store_8((int8_t *)&ctx->queryEnd, true);
}
break;
}
}
_return:
*pRsp = rsp;
input.code = code;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL);
QW_RET(code);
}
......@@ -877,7 +877,7 @@ TEST(seqTest, normalCase) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
......@@ -913,7 +913,7 @@ TEST(seqTest, cancelFirst) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
......@@ -950,7 +950,7 @@ TEST(seqTest, randCase) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
int32_t t = 0;
......@@ -1021,7 +1021,7 @@ TEST(seqTest, multithreadRand) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
TdThreadAttr thattr;
......@@ -1084,7 +1084,7 @@ TEST(rcTest, shortExecshortDelay) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
......@@ -1168,7 +1168,7 @@ TEST(rcTest, longExecshortDelay) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000;
......@@ -1254,7 +1254,7 @@ TEST(rcTest, shortExeclongDelay) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
......@@ -1338,7 +1338,7 @@ TEST(rcTest, dropTest) {
SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer;
msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
tsem_init(&qwtTestQuerySem, 0, 0);
......
......@@ -9,7 +9,7 @@ target_include_directories(
target_link_libraries(
scheduler
PUBLIC os util nodes planner qcom common catalog transport command
PUBLIC os util nodes planner qcom common catalog transport command qworker executor
)
if(${BUILD_TEST})
......
......@@ -151,6 +151,7 @@ typedef struct SSchedulerMgmt {
SSchStat stat;
SRWLatch hbLock;
SHashObj *hbConnections;
void *queryMgmt;
} SSchedulerMgmt;
typedef struct SSchCallbackParamHeader {
......@@ -235,8 +236,10 @@ typedef struct SSchTask {
typedef struct SSchJobAttr {
EExplainMode explainMode;
bool queryJob;
bool insertJob;
bool needFetch;
bool needFlowCtrl;
bool localExec;
} SSchJobAttr;
typedef struct {
......@@ -263,7 +266,7 @@ typedef struct SSchJob {
SHashObj *taskList;
SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx;
int8_t status;
SQueryNodeAddr resNode;
......@@ -304,6 +307,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
......@@ -326,8 +331,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob)
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
......@@ -502,6 +508,8 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
extern SSchDebug gSCHDebug;
......
......@@ -720,6 +720,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
}
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->attr.localExec = pReq->localReq;
pJob->conn = *pReq->pConn;
if (pReq->sql) {
pJob->sql = strdup(pReq->sql);
......
......@@ -72,6 +72,71 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
int32_t code = 0;
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schLaunchFetchTask(pJob));
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
if (pJob->fetchRes) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
msg = NULL;
schProcessOnDataFetched(pJob);
_return:
taosMemoryFreeClear(msg);
SCH_RET(code);
}
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
if (pRsp) {
SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
......@@ -301,65 +366,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SExplainRsp rsp = {0};
if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
taosMemoryFree(rsp.subplanInfo);
tFreeSExplainRsp(&rsp);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
taosMemoryFreeClear(msg);
break;
}
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schLaunchFetchTask(pJob));
taosMemoryFreeClear(msg);
return TSDB_CODE_SUCCESS;
}
if (pJob->fetchRes) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes);
taosMemoryFreeClear(rsp);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->fetchRes, rsp);
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);
code = schProcessFetchRsp(pJob, pTask, msg, rspCode);
msg = NULL;
schProcessOnDataFetched(pJob);
SCH_ERR_JRET(code);
break;
}
case TDMT_SCH_DROP_TASK_RSP: {
......
......@@ -20,6 +20,7 @@
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "qworker.h"
#include "tglobal.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
......@@ -90,6 +91,10 @@ _return:
}
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
return TSDB_CODE_SUCCESS;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
......@@ -230,7 +235,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_RET(errCode);
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
......@@ -295,6 +299,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.execId = pTask->execId,
.addr = pTask->succeedAddr,
.fetchMsgType = SCH_FETCH_TYPE(pTask),
.localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
......@@ -825,6 +830,120 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
return TSDB_CODE_SUCCESS;
}
int32_t schHandleExplainRes(SArray *pExplainRes) {
int32_t code = 0;
int32_t resNum = taosArrayGetSize(pExplainRes);
if (resNum <= 0) {
goto _return;
}
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
pJob = schAcquireJob(localRsp->rId);
if (NULL == pJob) {
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
}
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
schReleaseJob(pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
}
code = schGetTaskInJob(pJob, localRsp->tId, &pTask);
if (TSDB_CODE_SUCCESS == code) {
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
}
schReleaseJob(pJob->refId);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code);
SCH_ERR_JRET(code);
localRsp->rsp.numOfPlans = 0;
localRsp->rsp.subplanInfo = NULL;
pTask = NULL;
pJob = NULL;
}
_return:
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
tFreeSExplainRsp(&localRsp->rsp);
}
taosArrayDestroy(pExplainRes);
SCH_RET(code);
}
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SSubplan *plan = pTask->plan;
int32_t code = 0;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_RET(code);
} else if (tsQueryPlannerTrace) {
char *msg = NULL;
int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen);
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
taosMemoryFree(msg);
}
}
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
}
int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
//SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (NULL == schMgmt.queryMgmt) {
SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL));
}
SArray *explainRes = NULL;
SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask);
qwMsg.msg = pTask->plan;
qwMsg.msgType = pTask->plan->msgType;
qwMsg.connInfo.handle = pJob->conn.pTrans;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
}
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
}
int32_t schLaunchTaskImpl(void *param) {
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
......@@ -848,7 +967,8 @@ int32_t schLaunchTaskImpl(void *param) {
pTask->retryTimes++;
pTask->waitRetry = false;
SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE",
pTask->execId, pTask->retryTimes);
SCH_LOG_TASK_START_TS(pTask);
......@@ -863,31 +983,12 @@ int32_t schLaunchTaskImpl(void *param) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
SSubplan *plan = pTask->plan;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_JRET(code);
} else if (tsQueryPlannerTrace) {
char *msg = NULL;
int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen);
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
taosMemoryFree(msg);
}
}
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask));
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) {
SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask));
} else {
SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask));
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
_return:
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
......@@ -980,6 +1081,29 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
}
}
int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
}
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
void *pRsp = NULL;
SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
}
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
return TSDB_CODE_SUCCESS;
}
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
int32_t code = 0;
......@@ -990,7 +1114,11 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
} else {
SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask));
}
return TSDB_CODE_SUCCESS;
......
......@@ -17,6 +17,7 @@
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "qworker.h"
SSchedulerMgmt schMgmt = {
.jobRef = -1,
......@@ -192,4 +193,7 @@ void schedulerDestroy(void) {
schMgmt.hbConnections = NULL;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
qWorkerDestroy(&schMgmt.queryMgmt);
schMgmt.queryMgmt = NULL;
}
......@@ -517,3 +517,94 @@ python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
python3 ./test.py -f 2-query/sml.py -Q 3
python3 ./test.py -f 2-query/interp.py -Q 3
#------------querPolicy 4-----------
python3 ./test.py -f 2-query/between.py -Q 4
python3 ./test.py -f 2-query/distinct.py -Q 4
python3 ./test.py -f 2-query/varchar.py -Q 4
python3 ./test.py -f 2-query/ltrim.py -Q 4
python3 ./test.py -f 2-query/rtrim.py -Q 4
python3 ./test.py -f 2-query/length.py -Q 4
python3 ./test.py -f 2-query/char_length.py -Q 4
python3 ./test.py -f 2-query/upper.py -Q 4
python3 ./test.py -f 2-query/lower.py -Q 4
python3 ./test.py -f 2-query/join.py -Q 4
python3 ./test.py -f 2-query/join2.py -Q 4
python3 ./test.py -f 2-query/cast.py -Q 4
python3 ./test.py -f 2-query/substr.py -Q 4
python3 ./test.py -f 2-query/union.py -Q 4
python3 ./test.py -f 2-query/union1.py -Q 4
python3 ./test.py -f 2-query/concat.py -Q 4
python3 ./test.py -f 2-query/concat2.py -Q 4
python3 ./test.py -f 2-query/concat_ws.py -Q 4
python3 ./test.py -f 2-query/concat_ws2.py -Q 4
#python3 ./test.py -f 2-query/check_tsdb.py -Q 4
python3 ./test.py -f 2-query/spread.py -Q 4
python3 ./test.py -f 2-query/hyperloglog.py -Q 4
python3 ./test.py -f 2-query/explain.py -Q 4
python3 ./test.py -f 2-query/leastsquares.py -Q 4
python3 ./test.py -f 2-query/timezone.py -Q 4
python3 ./test.py -f 2-query/Now.py -Q 4
python3 ./test.py -f 2-query/Today.py -Q 4
python3 ./test.py -f 2-query/max.py -Q 4
python3 ./test.py -f 2-query/min.py -Q 4
python3 ./test.py -f 2-query/count.py -Q 4
#python3 ./test.py -f 2-query/last.py -Q 4
python3 ./test.py -f 2-query/first.py -Q 4
python3 ./test.py -f 2-query/To_iso8601.py -Q 4
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 4
python3 ./test.py -f 2-query/timetruncate.py -Q 4
python3 ./test.py -f 2-query/diff.py -Q 4
python3 ./test.py -f 2-query/Timediff.py -Q 4
python3 ./test.py -f 2-query/json_tag.py -Q 4
python3 ./test.py -f 2-query/top.py -Q 4
python3 ./test.py -f 2-query/bottom.py -Q 4
python3 ./test.py -f 2-query/percentile.py -Q 4
python3 ./test.py -f 2-query/apercentile.py -Q 4
python3 ./test.py -f 2-query/abs.py -Q 4
python3 ./test.py -f 2-query/ceil.py -Q 4
python3 ./test.py -f 2-query/floor.py -Q 4
python3 ./test.py -f 2-query/round.py -Q 4
python3 ./test.py -f 2-query/log.py -Q 4
python3 ./test.py -f 2-query/pow.py -Q 4
python3 ./test.py -f 2-query/sqrt.py -Q 4
python3 ./test.py -f 2-query/sin.py -Q 4
python3 ./test.py -f 2-query/cos.py -Q 4
python3 ./test.py -f 2-query/tan.py -Q 4
python3 ./test.py -f 2-query/arcsin.py -Q 4
python3 ./test.py -f 2-query/arccos.py -Q 4
python3 ./test.py -f 2-query/arctan.py -Q 4
python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 4
# python3 ./test.py -f 2-query/nestedQuery.py -Q 4
# python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
# python3 ./test.py -f 2-query/avg.py -Q 4
# python3 ./test.py -f 2-query/elapsed.py -Q 4
python3 ./test.py -f 2-query/csum.py -Q 4
#python3 ./test.py -f 2-query/mavg.py -Q 4
python3 ./test.py -f 2-query/sample.py -Q 4
python3 ./test.py -f 2-query/function_diff.py -Q 4
python3 ./test.py -f 2-query/unique.py -Q 4
python3 ./test.py -f 2-query/stateduration.py -Q 4
python3 ./test.py -f 2-query/function_stateduration.py -Q 4
python3 ./test.py -f 2-query/statecount.py -Q 4
python3 ./test.py -f 2-query/tail.py -Q 4
python3 ./test.py -f 2-query/ttl_comment.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_count.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_max.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_min.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_sum.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_spread.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_apercentile.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_avg.py -Q 4
python3 ./test.py -f 2-query/distribute_agg_stddev.py -Q 4
python3 ./test.py -f 2-query/twa.py -Q 4
python3 ./test.py -f 2-query/irate.py -Q 4
python3 ./test.py -f 2-query/function_null.py -Q 4
python3 ./test.py -f 2-query/count_partition.py -Q 4
python3 ./test.py -f 2-query/max_partition.py -Q 4
python3 ./test.py -f 2-query/last_row.py -Q 4
python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
#python3 ./test.py -f 2-query/sml.py -Q 4
python3 ./test.py -f 2-query/interp.py -Q 4
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册