提交 9e44816b 编写于 作者: D dapan1121

enh: support client local merge policy

上级 1427a8f3
......@@ -1412,6 +1412,14 @@ typedef struct {
SExplainExecInfo* subplanInfo;
} SExplainRsp;
typedef struct {
SExplainRsp rsp;
uint64_t qId;
uint64_t tId;
int64_t rId;
int32_t eId;
} SExplainLocalRsp;
typedef struct STableScanAnalyzeInfo {
uint64_t totalRows;
uint64_t totalCheckedRows;
......@@ -1426,6 +1434,7 @@ typedef struct STableScanAnalyzeInfo {
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
void tFreeSExplainRsp(SExplainRsp *pRsp);
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
......
......@@ -29,11 +29,12 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
typedef int32_t (*localFetchFp)(void *handle, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp);
typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
typedef struct {
void *handle;
void *handle;
localFetchFp fp;
SArray *explainRes;
} SLocalFetch;
typedef struct {
......
......@@ -99,9 +99,9 @@ void qWorkerDestroy(void **qWorkerMgmt);
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg);
int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes);
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp);
int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes);
#ifdef __cplusplus
}
......
......@@ -4334,7 +4334,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1;
if (pRsp->numOfPlans > 0) {
pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo));
pRsp->subplanInfo = taosMemoryCalloc(pRsp->numOfPlans, sizeof(SExplainExecInfo));
if (pRsp->subplanInfo == NULL) return -1;
}
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
......@@ -4342,7 +4342,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1;
if (tDecodeBinary(&decoder, (uint8_t **)&pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0)
if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0)
return -1;
}
......@@ -4352,6 +4352,19 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) {
return 0;
}
void tFreeSExplainRsp(SExplainRsp *pRsp) {
if (NULL == pRsp) {
return;
}
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
SExplainExecInfo *pExec = pRsp->subplanInfo + i;
taosMemoryFree(pExec->verboseInfo);
}
taosMemoryFreeClear(pRsp->subplanInfo);
}
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
......
......@@ -56,7 +56,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) {
int32_t num = taosArrayGetSize(group->nodeExecInfo);
for (int32_t i = 0; i < num; ++i) {
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
taosMemoryFreeClear(rsp->subplanInfo);
tFreeSExplainRsp(rsp);
}
}
......@@ -1723,7 +1723,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId));
if (NULL == group) {
qError("group %d not in groupHash", groupId);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -1732,7 +1732,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp));
if (NULL == group->nodeExecInfo) {
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
taosWUnLockLatch(&group->lock);
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1742,7 +1742,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
} else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) {
qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo),
group->nodeNum);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
taosWUnLockLatch(&group->lock);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -1751,13 +1751,14 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t
if (group->physiPlanExecNum != pRspMsg->numOfPlans) {
qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum,
groupId);
taosMemoryFreeClear(pRspMsg->subplanInfo);
tFreeSExplainRsp(pRspMsg);
taosWUnLockLatch(&group->lock);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
taosArrayPush(group->nodeExecInfo, pRspMsg);
groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum);
taosWUnLockLatch(&group->lock);
......
......@@ -455,8 +455,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL
int64_t threadId = taosGetSelfPthreadId();
if (pLocal) {
pTaskInfo->localFetch.handle = pLocal->handle;
pTaskInfo->localFetch.fp = pLocal->fp;
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
}
taosArrayClearEx(pResList, freeBlock);
......
......@@ -2005,7 +2005,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
if (pSource->localExec) {
SDataBuf pBuf = {0};
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData);
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
loadRemoteDataCallback(pWrapper, &pBuf, code);
} else {
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu,
......
......@@ -58,7 +58,17 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList));
if (ctx->localExec) {
SExplainLocalRsp localRsp = {0};
localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
localRsp.rsp.subplanInfo = pExec;
localRsp.qId = qId;
localRsp.tId = tId;
localRsp.rId = rId;
localRsp.eId = eId;
taosArrayPush(ctx->explainRes, &localRsp);
taosArrayDestroy(execInfoList);
} else {
SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
connInfo.ahandle = NULL;
......@@ -84,7 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t execNum = 0;
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch};
SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes};
SArray *pResList = taosArrayInit(4, POINTER_BYTES);
while (true) {
......
......@@ -505,6 +505,7 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
extern SSchDebug gSCHDebug;
......
......@@ -128,10 +128,10 @@ _return:
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
return TSDB_CODE_SUCCESS;
......@@ -366,7 +366,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SExplainRsp rsp = {0};
if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
taosMemoryFree(rsp.subplanInfo);
tFreeSExplainRsp(&rsp);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......
......@@ -824,6 +824,53 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
return TSDB_CODE_SUCCESS;
}
int32_t schHandleExplainRes(SArray *pExplainRes) {
int32_t code = 0;
int32_t resNum = taosArrayGetSize(pExplainRes);
if (resNum <= 0) {
goto _return;
}
SSchTask *pTask = NULL;
SSchJob *pJob = NULL;
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg");
SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, localRsp->qId, localRsp->rId, localRsp->tId));
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
if (pTask) {
SCH_UNLOCK_TASK(pTask);
}
if (pJob) {
schReleaseJob(pJob->refId);
}
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg");
SCH_ERR_JRET(code);
localRsp->rsp.numOfPlans = 0;
localRsp->rsp.subplanInfo = NULL;
}
_return:
for (int32_t i = 0; i < resNum; ++i) {
SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i);
tFreeSExplainRsp(&localRsp->rsp);
}
taosArrayDestroy(pExplainRes);
SCH_RET(code);
}
int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SSubplan *plan = pTask->plan;
int32_t code = 0;
......@@ -864,11 +911,15 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
qwMsg.connInfo.handle = pJob->conn.pTrans;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES);
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
}
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
}
......@@ -1008,10 +1059,14 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES);
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
}
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册