提交 ed5b2832 编写于 作者: D dapan1121

feature/scheduler

上级 f6b5bfae
...@@ -81,16 +81,16 @@ typedef struct SRpcInit { ...@@ -81,16 +81,16 @@ typedef struct SRpcInit {
} SRpcInit; } SRpcInit;
typedef struct { typedef struct {
void * val; void *val;
int32_t len; int32_t (*clone)(void *src, void **dst);
void (*free)(void *arg); void (*free)(void *arg);
} SRpcCtxVal; } SRpcCtxVal;
typedef struct { typedef struct {
int32_t msgType; int32_t msgType;
void * val; void *val;
int32_t len; int32_t (*clone)(void *src, void **dst);
void (*free)(void *arg); void (*free)(void *arg);
} SRpcBrokenlinkVal; } SRpcBrokenlinkVal;
typedef struct { typedef struct {
......
...@@ -185,6 +185,10 @@ static void doDestroyRequest(void *p) { ...@@ -185,6 +185,10 @@ static void doDestroyRequest(void *p) {
doFreeReqResultInfo(&pRequest->body.resInfo); doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryPlan(pRequest->body.pDag); qDestroyQueryPlan(pRequest->body.pDag);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
if (pRequest->body.showInfo.pArray != NULL) { if (pRequest->body.showInfo.pArray != NULL) {
taosArrayDestroy(pRequest->body.showInfo.pArray); taosArrayDestroy(pRequest->body.showInfo.pArray);
} }
......
...@@ -330,6 +330,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { ...@@ -330,6 +330,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest->code = code; pRequest->code = code;
break; break;
} }
destroyRequest(pRequest);
} }
return pRequest; return pRequest;
......
...@@ -241,6 +241,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); ...@@ -241,6 +241,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob); int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId); int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -1172,6 +1172,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { ...@@ -1172,6 +1172,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
} }
// TODO // TODO
SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status));
schReleaseJob(taskStatus->refId); schReleaseJob(taskStatus->refId);
} }
...@@ -1309,7 +1311,7 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal * ...@@ -1309,7 +1311,7 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *
brokenVal->msgType = msgType; brokenVal->msgType = msgType;
brokenVal->val = pMsgSendInfo; brokenVal->val = pMsgSendInfo;
brokenVal->len = sizeof(SMsgSendInfo); brokenVal->clone = schCloneSMsgSendInfo;
brokenVal->free = schFreeRpcCtxVal; brokenVal->free = schFreeRpcCtxVal;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1357,7 +1359,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { ...@@ -1357,7 +1359,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1414,7 +1416,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { ...@@ -1414,7 +1416,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .len = sizeof(SMsgSendInfo), .free = schFreeRpcCtxVal}; SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -1486,26 +1488,27 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe ...@@ -1486,26 +1488,27 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schCloneSMsgSendInfo(SMsgSendInfo *pSrc, SMsgSendInfo **pDst) { int32_t schCloneSMsgSendInfo(void *src, void **dst) {
SMsgSendInfo *pSrc = src;
int32_t code = 0; int32_t code = 0;
SMsgSendInfo *dst = malloc(sizeof(*pSrc)); SMsgSendInfo *pDst = malloc(sizeof(*pSrc));
if (NULL == dst) { if (NULL == pDst) {
qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc)); qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
memcpy(dst, pSrc, sizeof(*pSrc)); memcpy(pDst, pSrc, sizeof(*pSrc));
dst->param = NULL; pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&dst->param)); SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
*pDst = dst; *dst = pDst;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
tfree(dst); tfree(pDst);
SCH_RET(code); SCH_RET(code);
} }
...@@ -1514,7 +1517,7 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { ...@@ -1514,7 +1517,7 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal)); memcpy(&pDst->brokenVal, &pSrc->brokenVal, sizeof(pSrc->brokenVal));
pDst->brokenVal.val = NULL; pDst->brokenVal.val = NULL;
SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, (SMsgSendInfo **)&pDst->brokenVal.val)); SCH_ERR_RET(schCloneSMsgSendInfo(pSrc->brokenVal.val, &pDst->brokenVal.val));
pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); pDst->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pDst->args) { if (NULL == pDst->args) {
...@@ -1528,9 +1531,12 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { ...@@ -1528,9 +1531,12 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
SRpcCtxVal *pVal = (SRpcCtxVal *)pIter; SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
int32_t *msgType = taosHashGetKey(pIter, NULL); int32_t *msgType = taosHashGetKey(pIter, NULL);
SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, (SMsgSendInfo **)&dst.val)); dst = *pVal;
dst.val = NULL;
SCH_ERR_JRET(schCloneSMsgSendInfo(pVal->val, &dst.val));
if (taosHashPut(pDst->args, msgType, sizeof(*msgType), pVal, sizeof(*pVal))) { if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
qError("taosHashPut msg %d to rpcCtx failed", *msgType); qError("taosHashPut msg %d to rpcCtx failed", *msgType);
(*dst.free)(dst.val); (*dst.free)(dst.val);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -2047,7 +2053,7 @@ void schFreeJobImpl(void *job) { ...@@ -2047,7 +2053,7 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy(pJob->nodeList); taosArrayDestroy(pJob->nodeList);
tfree(pJob->resData); tfree(pJob->resData);
tfree(pJob); free(pJob);
qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob); qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
} }
......
...@@ -273,17 +273,17 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) { ...@@ -273,17 +273,17 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key) {
if (cVal == NULL) { if (cVal == NULL) {
return NULL; return NULL;
} }
char* ret = calloc(1, cVal->len); void *ret = NULL;
memcpy(ret, (char*)cVal->val, cVal->len); (*cVal->clone)(cVal->val, &ret);
return (void*)ret; return ret;
} }
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
char* ret = calloc(1, ctx->brokenVal.len); void *ret = NULL;
(*ctx->brokenVal.clone)(ctx->brokenVal.val, &ret);
memcpy(ret, (char*)(ctx->brokenVal.val), ctx->brokenVal.len);
*msgType = ctx->brokenVal.msgType; *msgType = ctx->brokenVal.msgType;
return (void*)ret; return ret;
} }
void transQueueInit(STransQueue* queue, void (*free)(void* arg)) { void transQueueInit(STransQueue* queue, void (*free)(void* arg)) {
......
...@@ -156,17 +156,15 @@ TEST_F(TransCtxEnv, mergeTest) { ...@@ -156,17 +156,15 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(src); transCtxInit(src);
{ {
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; STransCtxVal val1 = {.val = NULL, .free = free};
val1.val = malloc(12); val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++; key++;
} }
{ {
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; STransCtxVal val1 = {.val = NULL, .free = free};
val1.val = malloc(12); val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++; key++;
} }
...@@ -178,17 +176,15 @@ TEST_F(TransCtxEnv, mergeTest) { ...@@ -178,17 +176,15 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(src); transCtxInit(src);
{ {
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; STransCtxVal val1 = {.val = NULL, .free = free};
val1.val = malloc(12); val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++; key++;
} }
{ {
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; STransCtxVal val1 = {.val = NULL, .free = free};
val1.val = malloc(12); val1.val = malloc(12);
val1.len = 12;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++; key++;
} }
...@@ -202,19 +198,17 @@ TEST_F(TransCtxEnv, mergeTest) { ...@@ -202,19 +198,17 @@ TEST_F(TransCtxEnv, mergeTest) {
STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx)); STransCtx *src = (STransCtx *)calloc(1, sizeof(STransCtx));
transCtxInit(src); transCtxInit(src);
{ {
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; STransCtxVal val1 = {.val = NULL, .free = free};
val1.val = calloc(1, 11); val1.val = calloc(1, 11);
memcpy(val1.val, val.c_str(), val.size()); memcpy(val1.val, val.c_str(), val.size());
val1.len = 11;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++; key++;
} }
{ {
STransCtxVal val1 = {.val = NULL, .len = 0, .free = free}; STransCtxVal val1 = {.val = NULL, .free = free};
val1.val = calloc(1, 11); val1.val = calloc(1, 11);
memcpy(val1.val, val.c_str(), val.size()); memcpy(val1.val, val.c_str(), val.size());
val1.len = 11;
taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1)); taosHashPut(src->args, &key, sizeof(key), &val1, sizeof(val1));
key++; key++;
} }
......
...@@ -358,6 +358,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -358,6 +358,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
} else { } else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
} }
taos_free_result(pSql);
} else { } else {
int num_rows_affacted = taos_affected_rows(pSql); int num_rows_affacted = taos_affected_rows(pSql);
taos_free_result(pSql); taos_free_result(pSql);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册