提交 79ce13e8 编写于 作者: D dapan

feature/qnode

上级 ff0200ae
...@@ -629,7 +629,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -629,7 +629,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
dropConnection = ctx->dropConnection; dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it // Note: ctx freed, no need to unlock it
locked = false; locked = false;
break;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
...@@ -639,6 +641,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -639,6 +641,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_SET_RSP_CODE(ctx, output->rspCode); QW_SET_RSP_CODE(ctx, output->rspCode);
cancelConnection = ctx->cancelConnection; cancelConnection = ctx->cancelConnection;
break;
} }
if (ctx->rspCode) { if (ctx->rspCode) {
...@@ -1215,8 +1219,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1215,8 +1219,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
ctx->dropConnection = qwMsg->connection;
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE));
...@@ -1225,11 +1227,11 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1225,11 +1227,11 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
locked = false; locked = false;
needRsp = true; needRsp = true;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
if (!needRsp) { if (!needRsp) {
ctx->dropConnection = qwMsg->connection;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} }
...@@ -1239,6 +1241,10 @@ _return: ...@@ -1239,6 +1241,10 @@ _return:
QW_UPDATE_RSP_CODE(ctx, code); QW_UPDATE_RSP_CODE(ctx, code);
} }
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctx) { if (ctx) {
qwReleaseTaskCtx(mgmt, ctx); qwReleaseTaskCtx(mgmt, ctx);
} }
......
...@@ -137,7 +137,7 @@ void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) { ...@@ -137,7 +137,7 @@ void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
} }
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
*subplan = 0x1; *subplan = (SSubplan *)0x1;
return 0; return 0;
} }
...@@ -192,10 +192,10 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { ...@@ -192,10 +192,10 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (0 == pRsp->code) { if (0 == pRsp->code) {
qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc); qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc);
qwtPutReqToFetchQueue(0x1, &qwtreadyRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtreadyRpc);
} else { } else {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue(0x1, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
} }
break; break;
...@@ -205,10 +205,10 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { ...@@ -205,10 +205,10 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (0 == pRsp->code) { if (0 == pRsp->code) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue(0x1, &qwtfetchRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
} else { } else {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue(0x1, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
} }
break; break;
} }
...@@ -217,12 +217,12 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { ...@@ -217,12 +217,12 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (0 == pRsp->code && 0 == rsp->completed) { if (0 == pRsp->code && 0 == rsp->completed) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue(0x1, &qwtfetchRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
return; return;
} }
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue(0x1, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
break; break;
} }
...@@ -352,7 +352,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { ...@@ -352,7 +352,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
qwtTestSinkBlockNum--; qwtTestSinkBlockNum--;
pOutput->numOfRows = rand() % 10 + 1; pOutput->numOfRows = rand() % 10 + 1;
pOutput->compressed = 1; pOutput->compressed = 1;
pOutput->pData = malloc(pOutput->numOfRows); pOutput->pData = (char *)malloc(pOutput->numOfRows);
pOutput->queryEnd = qwtTestSinkQueryEnd; pOutput->queryEnd = qwtTestSinkQueryEnd;
if (qwtTestSinkBlockNum == 0) { if (qwtTestSinkBlockNum == 0) {
pOutput->bufStatus = DS_BUF_EMPTY; pOutput->bufStatus = DS_BUF_EMPTY;
...@@ -648,7 +648,7 @@ void *clientThread(void *param) { ...@@ -648,7 +648,7 @@ void *clientThread(void *param) {
qwtTestCaseFinished = false; qwtTestCaseFinished = false;
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qwtPutReqToQueue(0x1, &queryRpc); qwtPutReqToQueue((void *)0x1, &queryRpc);
while (!qwtTestCaseFinished) { while (!qwtTestCaseFinished) {
usleep(1); usleep(1);
...@@ -692,15 +692,16 @@ void *queryQueueThread(void *param) { ...@@ -692,15 +692,16 @@ void *queryQueueThread(void *param) {
taosWUnLockLatch(&qwtTestQueryQueueLock); taosWUnLockLatch(&qwtTestQueryQueueLock);
if (TDMT_VND_QUERY == queryRpc->msgType) { if (TDMT_VND_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc);
} else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) { } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, &queryRpc) qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc);
} else { } else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0); assert(0);
} }
} }
return NULL;
} }
void *fetchQueueThread(void *param) { void *fetchQueueThread(void *param) {
...@@ -743,6 +744,7 @@ void *fetchQueueThread(void *param) { ...@@ -743,6 +744,7 @@ void *fetchQueueThread(void *param) {
} }
} }
return NULL;
} }
......
...@@ -275,7 +275,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { ...@@ -275,7 +275,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
} }
int32_t schRecordTaskSucceedNode(SSchTask *pTask) { int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
int32_t idx = atomic_load_8(&pTask->candidateIdx); int32_t idx = atomic_load_8(&pTask->candidateIdx);
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx);
if (NULL == addr) { if (NULL == addr) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册