From ed5b28329110a49a7668779d4142205ab9fb0da5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 23 Mar 2022 17:19:04 +0800 Subject: [PATCH] feature/scheduler --- include/libs/transport/trpc.h | 14 ++++---- source/client/src/clientEnv.c | 4 +++ source/client/src/clientImpl.c | 2 ++ source/libs/scheduler/inc/schedulerInt.h | 1 + source/libs/scheduler/src/scheduler.c | 36 ++++++++++++-------- source/libs/transport/src/transComm.c | 12 +++---- source/libs/transport/test/transportTests.cc | 18 ++++------ tools/shell/src/shellEngine.c | 1 + 8 files changed, 48 insertions(+), 40 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index c2cce3a05d..a506c6fe98 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -81,16 +81,16 @@ typedef struct SRpcInit { } SRpcInit; typedef struct { - void * val; - int32_t len; - void (*free)(void *arg); + void *val; + int32_t (*clone)(void *src, void **dst); + void (*free)(void *arg); } SRpcCtxVal; typedef struct { - int32_t msgType; - void * val; - int32_t len; - void (*free)(void *arg); + int32_t msgType; + void *val; + int32_t (*clone)(void *src, void **dst); + void (*free)(void *arg); } SRpcBrokenlinkVal; typedef struct { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6ebf9e71e0..fd6a72962d 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -185,6 +185,10 @@ static void doDestroyRequest(void *p) { doFreeReqResultInfo(&pRequest->body.resInfo); qDestroyQueryPlan(pRequest->body.pDag); + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); + } + if (pRequest->body.showInfo.pArray != NULL) { taosArrayDestroy(pRequest->body.showInfo.pArray); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a7f15cbe45..70cd00ff11 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -330,6 +330,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { pRequest->code = code; break; } + + destroyRequest(pRequest); } return pRequest; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index fe27b85b40..9d973e4437 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -241,6 +241,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId); +int32_t schCloneSMsgSendInfo(void *src, void **dst); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ea12ba25d7..ca471326e0 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1172,6 +1172,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { } // TODO + + SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status)); schReleaseJob(taskStatus->refId); } @@ -1309,7 +1311,7 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal * brokenVal->msgType = msgType; brokenVal->val = pMsgSendInfo; - brokenVal->len = sizeof(SMsgSendInfo); + brokenVal->clone = schCloneSMsgSendInfo; brokenVal->free = schFreeRpcCtxVal; return TSDB_CODE_SUCCESS; @@ -1357,7 +1359,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; - SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; + SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1414,7 +1416,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; - SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; + SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1486,26 +1488,27 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe return TSDB_CODE_SUCCESS; } -int32_t schCloneSMsgSendInfo(SMsgSendInfo *pSrc, SMsgSendInfo **pDst) { +int32_t schCloneSMsgSendInfo(void *src, void **dst) { + SMsgSendInfo *pSrc = src; int32_t code = 0; - SMsgSendInfo *dst = malloc(sizeof(*pSrc)); - if (NULL == dst) { + SMsgSendInfo *pDst = malloc(sizeof(*pSrc)); + if (NULL == pDst) { qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - memcpy(dst, pSrc, sizeof(*pSrc)); - dst->param = NULL; + memcpy(pDst, pSrc, sizeof(*pSrc)); + pDst->param = NULL; - SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&dst->param)); + SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); - *pDst = dst; + *dst = pDst; return TSDB_CODE_SUCCESS; _return: - tfree(dst); + tfree(pDst); SCH_RET(code); } @@ -1514,7 +1517,7 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal)); pDst->brokenVal.val = NULL; - SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, (SMsgSendInfo **)&pDst->brokenVal.val)); + SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val)); pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); if (NULL == pDst->args) { @@ -1528,9 +1531,12 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { SRpcCtxVal *pVal = (SRpcCtxVal *)pIter; int32_t *msgType = taosHashGetKey(pIter, NULL); - SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, (SMsgSendInfo **)&dst.val)); + dst = *pVal; + dst.val = NULL; + + SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val)); - if (taosHashPut(pDst->args, msgType, sizeof(*msgType), pVal, sizeof(*pVal))) { + if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) { qError("taosHashPut msg %d to rpcCtx failed", *msgType); (*dst.free)(dst.val); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -2047,7 +2053,7 @@ void schFreeJobImpl(void *job) { taosArrayDestroy(pJob->nodeList); tfree(pJob->resData); - tfree(pJob); + free(pJob); qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 87355ac8d0..6833594e7d 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -273,17 +273,17 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) { if (cVal == NULL) { return NULL; } - char* ret = calloc(1, cVal->len); - memcpy(ret, (char*)cVal->val, cVal->len); - return (void*)ret; + void *ret = NULL; + (*cVal->clone)(cVal->val, &ret); + return ret; } void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { - char* ret = calloc(1, ctx->brokenVal.len); + void *ret = NULL; + (*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret); - memcpy(ret, (char*)(ctx->brokenVal.val), ctx->brokenVal.len); *msgType = ctx->brokenVal.msgType; - return (void*)ret; + return ret; } void transQueueInit(STransQueue* queue, void (*free)(void* arg)) { diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 65d9302994..ad2da87435 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -156,17 +156,15 @@ TEST_F(TransCtxEnv, mergeTest) { STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); transCtxInit(src); { - STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + STransCtxVal val1 = {.val = NULL, .free = free}; val1.val = malloc(12); - val1.len = 12; taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); key++; } { - STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + STransCtxVal val1 = {.val = NULL, .free = free}; val1.val = malloc(12); - val1.len = 12; taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); key++; } @@ -178,17 +176,15 @@ TEST_F(TransCtxEnv, mergeTest) { STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); transCtxInit(src); { - STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + STransCtxVal val1 = {.val = NULL, .free = free}; val1.val = malloc(12); - val1.len = 12; taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); key++; } { - STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + STransCtxVal val1 = {.val = NULL, .free = free}; val1.val = malloc(12); - val1.len = 12; taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); key++; } @@ -202,19 +198,17 @@ TEST_F(TransCtxEnv, mergeTest) { STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); transCtxInit(src); { - STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + STransCtxVal val1 = {.val = NULL, .free = free}; val1.val = calloc(1, 11); memcpy(val1.val, val.c_str(), val.size()); - val1.len = 11; taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); key++; } { - STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; + STransCtxVal val1 = {.val = NULL, .free = free}; val1.val = calloc(1, 11); memcpy(val1.val, val.c_str(), val.size()); - val1.len = 11; taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); key++; } diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 1b35afb57d..c8fb901c0d 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -358,6 +358,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { } else { printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); } + taos_free_result(pSql); } else { int num_rows_affacted = taos_affected_rows(pSql); taos_free_result(pSql); -- GitLab