diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index c5a57b02a6f67ed06dd457043d47fe5feff91c8e..78214ce14da0099c832268fdc66be3fd86ed869d 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -73,7 +73,7 @@ struct SVnode { SVnodeSync* pSync; SVnodeFS* pFs; tsem_t canCommit; - void* pQuery; + SQHandle* pQuery; }; int vnodeScheduleTask(SVnodeTask* task); diff --git a/source/dnode/vnode/impl/inc/vnodeQuery.h b/source/dnode/vnode/impl/inc/vnodeQuery.h index 59bab42f62cfa98ff925e9c771e4b525e13690f9..d43f5b1cf1e4958baff459f05062a62c9521f2c9 100644 --- a/source/dnode/vnode/impl/inc/vnodeQuery.h +++ b/source/dnode/vnode/impl/inc/vnodeQuery.h @@ -22,6 +22,9 @@ extern "C" { #include "vnodeInt.h" #include "qworker.h" +typedef struct SQWorkerMgmt SQHandle; + + int vnodeQueryOpen(SVnode *pVnode); #ifdef __cplusplus diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 825e891c8b745fd27f35f950314c552469375a3f..07ca91729d289f4234f8a1f8553c50f86f7e3745 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -102,8 +102,29 @@ typedef struct SQWorkerMgmt { #define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) -#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) +#define QW_LOCK(type, _lock) do { \ + if (QW_READ == (type)) { \ + if ((*(_lock)) < 0) assert(0); \ + taosRLockLatch(_lock); \ + qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) < 0) assert(0); \ + taosWLockLatch(_lock); \ + qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) + +#define QW_UNLOCK(type, _lock) do { \ + if (QW_READ == (type)) { \ + if ((*(_lock)) <= 0) assert(0); \ + taosRUnLockLatch(_lock); \ + qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } else { \ + if ((*(_lock)) <= 0) assert(0); \ + taosWUnLockLatch(_lock); \ + qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + } \ +} while (0) static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7a861a0b8f37b831259b7dc36c385c7b71453d71..4296e82a5617b2847c02011fb76b5f808602cff2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -601,6 +601,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { } SRpcMsg rpcRsp = { + .msgType = pMsg->msgType + 1, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -703,10 +704,8 @@ _return: if (task) { QW_UNLOCK(QW_WRITE, &task->lock); - } - - if (sch) { qwReleaseTask(QW_READ, sch); + } qwReleaseScheduler(QW_READ, mgmt); @@ -742,9 +741,6 @@ _return: if (task) { QW_UNLOCK(QW_WRITE, &task->lock); - } - - if (sch) { qwReleaseTask(QW_READ, sch); } @@ -849,10 +845,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, u _return: if (task) { QW_UNLOCK(QW_READ, &task->lock); + qwReleaseTask(QW_READ, sch); } if (sch) { - qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index d63d40b4f42601fb9c3541ce5cc8a3d6961ab129..7bc1c4ff40f49024d3c5b69470f66740bf25c1ab 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -36,11 +36,21 @@ namespace { +bool testStop = false; + int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { return 0; } void qwtRpcSendResponse(const SRpcMsg *pRsp) { + if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) { + SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont; + printf("task num:%d\n", rsp->num); + for (int32_t i = 0; i < rsp->num; ++i) { + STaskStatus *task = &rsp->status[i]; + printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status); + } + } return; } @@ -72,12 +82,135 @@ void stubSetRpcSendResponse() { } } +void *queryThread(void *param) { + SRpcMsg queryRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->sId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; + + while (!testStop) { + qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("query:%d\n", n); + } + } + + return NULL; +} + +void *readyThread(void *param) { + SRpcMsg readyRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SResReadyMsg readyMsg = {0}; + readyMsg.sId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + readyRpc.contLen = sizeof(SResReadyMsg); + + while (!testStop) { + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("ready:%d\n", n); + } + } + + return NULL; +} + +void *fetchThread(void *param) { + SRpcMsg fetchRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SResFetchMsg fetchMsg = {0}; + fetchMsg.sId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + fetchRpc.contLen = sizeof(SResFetchMsg); + + while (!testStop) { + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("fetch:%d\n", n); + } + } + + return NULL; +} + +void *dropThread(void *param) { + SRpcMsg dropRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + STaskDropMsg dropMsg = {0}; + dropMsg.sId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + + while (!testStop) { + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("drop:%d\n", n); + } + } + + return NULL; +} + +void *statusThread(void *param) { + SRpcMsg statusRpc = {0}; + int32_t code = 0; + uint32_t n = 0; + void *mockPointer = (void *)0x1; + void *mgmt = param; + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + + while (!testStop) { + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + usleep(rand()%5); + if (++n % 50000 == 0) { + printf("status:%d\n", n); + } + } + + return NULL; +} + + + } -TEST(testCase, normalCase) { +TEST(seqTest, normalCase) { void *mgmt = NULL; int32_t code = 0; void *mockPointer = (void *)0x1; @@ -85,6 +218,8 @@ TEST(testCase, normalCase) { SRpcMsg readyRpc = {0}; SRpcMsg fetchRpc = {0}; SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); queryMsg->queryId = htobe64(1); queryMsg->sId = htobe64(1); @@ -114,24 +249,222 @@ TEST(testCase, normalCase) { dropRpc.pCont = &dropMsg; dropRpc.contLen = sizeof(STaskDropMsg); + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + stubSetStringToPlan(); stubSetRpcSendResponse(); code = qWorkerInit(NULL, &mgmt); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + qWorkerDestroy(&mgmt); +} + +TEST(seqTest, cancelFirst) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->sId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; + + STaskDropMsg dropMsg = {0}; + dropMsg.sId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); ASSERT_EQ(code, 0); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + ASSERT_EQ(code, 0); + + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + + qWorkerDestroy(&mgmt); +} + +TEST(seqTest, randCase) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg readyRpc = {0}; + SRpcMsg fetchRpc = {0}; + SRpcMsg dropRpc = {0}; + SRpcMsg statusRpc = {0}; + + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->sId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; + + SResReadyMsg readyMsg = {0}; + readyMsg.sId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + readyRpc.contLen = sizeof(SResReadyMsg); + + SResFetchMsg fetchMsg = {0}; + fetchMsg.sId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + fetchRpc.contLen = sizeof(SResFetchMsg); + + STaskDropMsg dropMsg = {0}; + dropMsg.sId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); + + SSchTasksStatusMsg statusMsg = {0}; + statusMsg.sId = htobe64(1); + statusRpc.pCont = &statusMsg; + statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.msgType = TDMT_VND_TASKS_STATUS; + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + + srand(time(NULL)); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + int32_t t = 0; + int32_t maxr = 10001; + while (true) { + int32_t r = rand() % maxr; + + if (r >= 0 && r < maxr/5) { + printf("Query,%d\n", t++); + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + } else if (r >= maxr/5 && r < maxr * 2/5) { + printf("Ready,%d\n", t++); + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + } else if (r >= maxr * 2/5 && r < maxr* 3/5) { + printf("Fetch,%d\n", t++); + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + } else if (r >= maxr * 3/5 && r < maxr * 4/5) { + printf("Drop,%d\n", t++); + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + } else if (r >= maxr * 4/5 && r < maxr-1) { + printf("Status,%d\n", t++); + statusMsg.sId = htobe64(1); + code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); + ASSERT_EQ(code, 0); + } else { + printf("QUIT RAND NOW"); + break; + } + } + + qWorkerDestroy(&mgmt); +} + +TEST(seqTest, multithreadRand) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + + srand(time(NULL)); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t t1,t2,t3,t4,t5; + pthread_create(&(t1), &thattr, queryThread, mgmt); + pthread_create(&(t2), &thattr, readyThread, NULL); + pthread_create(&(t3), &thattr, fetchThread, NULL); + pthread_create(&(t4), &thattr, dropThread, NULL); + pthread_create(&(t5), &thattr, statusThread, NULL); + + int32_t t = 0; + int32_t maxr = 10001; + sleep(300); + testStop = true; + sleep(1); + qWorkerDestroy(&mgmt); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d5833c3adf1d95dbe7cb1e98ec1f0e2db62c8424..c327e4cfead62c33da5d63af0651d868f2c56b34 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,6 +43,11 @@ typedef struct SSchedulerMgmt { SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; +typedef struct SSchCallbackParam { + uint64_t queryId; + uint64_t taskId; +} SSchCallbackParam; + typedef struct SSchLevel { int32_t level; int8_t status; @@ -120,6 +125,7 @@ typedef struct SSchJob { extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); +extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e298cd65f6743cee013146034e5277351f0f7ccc..4cde24e38cf9baeef87344b5ab4e21c9e21ac1c4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -21,36 +21,6 @@ SSchedulerMgmt schMgmt = {0}; -int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { -/* - SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT); - if (pRequest == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - SRequestMsgBody body = {0}; - buildConnectMsg(pRequest, &body); - - int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); - - tsem_wait(&pRequest->body.rspSem); - destroyConnectMsg(&body); - - if (pRequest->code != TSDB_CODE_SUCCESS) { - const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); - printf("failed to connect to server, reason: %s\n\n", errorMsg); - - destroyRequest(pRequest); - taos_close(pTscObj); - pTscObj = NULL; - } else { - tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); - destroyRequest(pRequest); - } -*/ -} - int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { SSchLevel *level = taosArrayGet(job->levels, i); @@ -312,100 +282,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } - -int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { - int32_t msgSize = 0; - void *msg = NULL; - - switch (msgType) { - case TDMT_VND_SUBMIT: { - if (NULL == task->msg || task->msgLen <= 0) { - qError("submit msg is NULL"); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - - msgSize = task->msgLen; - msg = task->msg; - break; - } - case TDMT_VND_QUERY: { - if (NULL == task->msg) { - qError("query msg is NULL"); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - - msgSize = sizeof(SSubQueryMsg) + task->msgLen; - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SSubQueryMsg *pMsg = msg; - - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - pMsg->contentLen = htonl(task->msgLen); - memcpy(pMsg->msg, task->msg, task->msgLen); - break; - } - case TDMT_VND_RES_READY: { - msgSize = sizeof(SResReadyMsg); - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SResReadyMsg *pMsg = msg; - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - break; - } - case TDMT_VND_FETCH: { - if (NULL == task) { - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - msgSize = sizeof(SResFetchMsg); - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SResFetchMsg *pMsg = msg; - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - break; - } - case TDMT_VND_DROP_TASK:{ - msgSize = sizeof(STaskDropMsg); - msg = calloc(1, msgSize); - if (NULL == msg) { - qError("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - STaskDropMsg *pMsg = msg; - pMsg->sId = htobe64(schMgmt.sId); - pMsg->queryId = htobe64(job->queryId); - pMsg->taskId = htobe64(task->taskId); - break; - } - default: - qError("unknown msg type:%d", msgType); - break; - } - - //TODO SEND MSG - //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code); - - return TSDB_CODE_SUCCESS; -} - int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -424,7 +300,7 @@ int32_t schFetchFromRemote(SSchJob *job) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); + SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; @@ -577,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { - case TDMT_VND_SUBMIT: { + case TDMT_VND_SUBMIT_RSP: { SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; if (rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); @@ -595,20 +471,20 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg } break; } - case TDMT_VND_QUERY: { + case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; if (rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); } else { - code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY); + code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY); if (code) { goto _task_error; } } break; } - case TDMT_VND_RES_READY: { + case TDMT_VND_RES_READY_RSP: { SResReadyRsp *rsp = (SResReadyRsp *)msg; if (rsp->code != TSDB_CODE_SUCCESS) { @@ -621,7 +497,7 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg } break; } - case TDMT_VND_FETCH: { + case TDMT_VND_FETCH_RSP: { SCH_ERR_JRET(rspCode); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; @@ -631,6 +507,9 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg SCH_ERR_JRET(schProcessOnDataFetched(job)); break; } + case TDMT_VND_DROP_TASK: { + + } default: qError("unknown msg type:%d received", msgType); return TSDB_CODE_QRY_INVALID_INPUT; @@ -648,6 +527,211 @@ _return: } +int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + SSchCallbackParam *pParam = (SSchCallbackParam *)param; + + SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); + if (NULL == job || NULL == (*job)) { + qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + if (NULL == task || NULL == (*task)) { + qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); + +_return: + tfree(param); + + SCH_RET(code); +} + +int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); +} +int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); +} +int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); +} +int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { + return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); +} +int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { + SSchCallbackParam *pParam = (SSchCallbackParam *)param; + qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); +} + +int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { + switch (msgType) { + case TDMT_VND_SUBMIT: + *fp = schHandleSubmitCallback; + break; + case TDMT_VND_QUERY: + *fp = schHandleQueryCallback; + break; + case TDMT_VND_RES_READY: + *fp = schHandleReadyCallback; + break; + case TDMT_VND_FETCH: + *fp = schHandleFetchCallback; + break; + case TDMT_VND_DROP_TASK: + *fp = schHandleDropCallback; + break; + default: + qError("unknown msg type:%d", msgType); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { + int32_t code = 0; + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam)); + if (NULL == param) { + qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + __async_send_cb_fn_t fp = NULL; + SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); + + param->queryId = qId; + param->taskId = tId; + + pMsgSendInfo->param = param; + pMsgSendInfo->msgInfo.pData = msg; + pMsgSendInfo->msgInfo.len = msgSize; + pMsgSendInfo->msgType = msgType; + + pMsgSendInfo->fp = fp; + + int64_t transporterId = 0; + SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo)); + + return TSDB_CODE_SUCCESS; + +_return: + tfree(param); + tfree(pMsgSendInfo); + + SCH_RET(code); +} + + +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { + uint32_t msgSize = 0; + void *msg = NULL; + int32_t code = 0; + + switch (msgType) { + case TDMT_VND_SUBMIT: { + if (NULL == task->msg || task->msgLen <= 0) { + qError("submit msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + msgSize = task->msgLen; + msg = task->msg; + break; + } + case TDMT_VND_QUERY: { + if (NULL == task->msg) { + qError("query msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + msgSize = sizeof(SSubQueryMsg) + task->msgLen; + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSubQueryMsg *pMsg = msg; + + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + pMsg->contentLen = htonl(task->msgLen); + memcpy(pMsg->msg, task->msg, task->msgLen); + break; + } + case TDMT_VND_RES_READY: { + msgSize = sizeof(SResReadyMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SResReadyMsg *pMsg = msg; + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + case TDMT_VND_FETCH: { + if (NULL == task) { + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + msgSize = sizeof(SResFetchMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SResFetchMsg *pMsg = msg; + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + case TDMT_VND_DROP_TASK:{ + msgSize = sizeof(STaskDropMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + STaskDropMsg *pMsg = msg; + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + default: + qError("unknown msg type:%d", msgType); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + break; + } + + SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(msg); + SCH_RET(code); +} int32_t schLaunchTask(SSchJob *job, SSchTask *task) { @@ -664,7 +748,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY; - SCH_ERR_RET(schAsyncSendMsg(job, task, msgType)); + SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType)); SCH_ERR_RET(schPushTaskToExecList(job, task)); @@ -673,6 +757,8 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } + + int32_t schLaunchJob(SSchJob *job) { SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { @@ -690,7 +776,7 @@ void schDropJobAllTasks(SSchJob *job) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); + schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); pIter = taosHashIterate(job->succTasks, pIter); } @@ -699,7 +785,7 @@ void schDropJobAllTasks(SSchJob *job) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK); + schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK); pIter = taosHashIterate(job->succTasks, pIter); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 4732429d0bb62e30bf4ff02eb19c14ec66183b23..6163bc0c1a4d22bc1b612a4218f8a485ff9cd802 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -36,7 +36,7 @@ namespace { -extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); void schtBuildQueryDag(SQueryDag *dag) { uint64_t qId = 0x0000000000000001; @@ -182,7 +182,7 @@ void *schtSendRsp(void *param) { SShellSubmitRspMsg rsp = {0}; rsp.affectedRows = 10; - schHandleRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -227,7 +227,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -238,7 +238,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -249,7 +249,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -260,7 +260,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -269,7 +269,7 @@ TEST(queryTest, normalCase) { SRetrieveTableRsp rsp = {0}; rsp.completed = 1; rsp.numOfRows = 10; - code = schHandleRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); + code = schProcessRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0);