提交 64d21e48 编写于 作者: D dapan1121

feature/qnode

上级 5d10a784
......@@ -73,7 +73,7 @@ struct SVnode {
SVnodeSync* pSync;
SVnodeFS* pFs;
tsem_t canCommit;
void* pQuery;
SQHandle* pQuery;
};
int vnodeScheduleTask(SVnodeTask* task);
......
......@@ -22,6 +22,9 @@ extern "C" {
#include "vnodeInt.h"
#include "qworker.h"
typedef struct SQWorkerMgmt SQHandle;
int vnodeQueryOpen(SVnode *pVnode);
#ifdef __cplusplus
......
......@@ -102,8 +102,29 @@ typedef struct SQWorkerMgmt {
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \
taosRLockLatch(_lock); \
qDebug("RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) < 0) assert(0); \
taosWLockLatch(_lock); \
qDebug("WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \
taosRUnLockLatch(_lock); \
qDebug("RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} else { \
if ((*(_lock)) <= 0) assert(0); \
taosWUnLockLatch(_lock); \
qDebug("WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \
} \
} while (0)
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt);
......
......@@ -601,6 +601,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
}
SRpcMsg rpcRsp = {
.msgType = pMsg->msgType + 1,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
......@@ -703,10 +704,8 @@ _return:
if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
......@@ -742,9 +741,6 @@ _return:
if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
}
if (sch) {
qwReleaseTask(QW_READ, sch);
}
......@@ -849,10 +845,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, u
_return:
if (task) {
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
}
......
......@@ -36,11 +36,21 @@
namespace {
bool testStop = false;
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0;
}
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
printf("task num:%d\n", rsp->num);
for (int32_t i = 0; i < rsp->num; ++i) {
STaskStatus *task = &rsp->status[i];
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
}
}
return;
}
......@@ -72,12 +82,135 @@ void stubSetRpcSendResponse() {
}
}
void *queryThread(void *param) {
SRpcMsg queryRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
while (!testStop) {
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("query:%d\n", n);
}
}
return NULL;
}
void *readyThread(void *param) {
SRpcMsg readyRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResReadyMsg readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
while (!testStop) {
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("ready:%d\n", n);
}
}
return NULL;
}
void *fetchThread(void *param) {
SRpcMsg fetchRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResFetchMsg fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
while (!testStop) {
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("fetch:%d\n", n);
}
}
return NULL;
}
void *dropThread(void *param) {
SRpcMsg dropRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
STaskDropMsg dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
while (!testStop) {
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("drop:%d\n", n);
}
}
return NULL;
}
void *statusThread(void *param) {
SRpcMsg statusRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
while (!testStop) {
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
printf("status:%d\n", n);
}
}
return NULL;
}
}
TEST(testCase, normalCase) {
TEST(seqTest, normalCase) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
......@@ -85,6 +218,8 @@ TEST(testCase, normalCase) {
SRpcMsg readyRpc = {0};
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
......@@ -114,24 +249,222 @@ TEST(testCase, normalCase) {
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt);
}
TEST(seqTest, cancelFirst) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
SRpcMsg queryRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
STaskDropMsg dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt);
}
TEST(seqTest, randCase) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
SRpcMsg queryRpc = {0};
SRpcMsg readyRpc = {0};
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
SResFetchMsg fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
STaskDropMsg dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
SSchTasksStatusMsg statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
stubSetRpcSendResponse();
srand(time(NULL));
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
int32_t t = 0;
int32_t maxr = 10001;
while (true) {
int32_t r = rand() % maxr;
if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
} else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
} else if (r >= maxr * 4/5 && r < maxr-1) {
printf("Status,%d\n", t++);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
} else {
printf("QUIT RAND NOW");
break;
}
}
qWorkerDestroy(&mgmt);
}
TEST(seqTest, multithreadRand) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
stubSetStringToPlan();
stubSetRpcSendResponse();
srand(time(NULL));
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, queryThread, mgmt);
pthread_create(&(t2), &thattr, readyThread, NULL);
pthread_create(&(t3), &thattr, fetchThread, NULL);
pthread_create(&(t4), &thattr, dropThread, NULL);
pthread_create(&(t5), &thattr, statusThread, NULL);
int32_t t = 0;
int32_t maxr = 10001;
sleep(300);
testStop = true;
sleep(1);
qWorkerDestroy(&mgmt);
}
......
......@@ -43,6 +43,11 @@ typedef struct SSchedulerMgmt {
SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
uint64_t queryId;
uint64_t taskId;
} SSchCallbackParam;
typedef struct SSchLevel {
int32_t level;
int8_t status;
......@@ -120,6 +125,7 @@ typedef struct SSchJob {
extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
#ifdef __cplusplus
}
......
......@@ -21,36 +21,6 @@
SSchedulerMgmt schMgmt = {0};
int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
/*
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
*/
}
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) {
SSchLevel *level = taosArrayGet(job->levels, i);
......@@ -312,100 +282,6 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
int32_t msgSize = 0;
void *msg = NULL;
switch (msgType) {
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = task->msgLen;
msg = task->msg;
break;
}
case TDMT_VND_QUERY: {
if (NULL == task->msg) {
qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = sizeof(SSubQueryMsg) + task->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(task->msgLen);
memcpy(pMsg->msg, task->msg, task->msgLen);
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_FETCH: {
if (NULL == task) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
default:
qError("unknown msg type:%d", msgType);
break;
}
//TODO SEND MSG
//taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
return TSDB_CODE_SUCCESS;
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
......@@ -424,7 +300,7 @@ int32_t schFetchFromRemote(SSchJob *job) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TDMT_VND_FETCH));
SCH_ERR_JRET(schBuildAndSendMsg(job, job->fetchTask, TDMT_VND_FETCH));
return TSDB_CODE_SUCCESS;
......@@ -577,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
return TSDB_CODE_SUCCESS;
}
int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
case TDMT_VND_SUBMIT: {
case TDMT_VND_SUBMIT_RSP: {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
......@@ -595,20 +471,20 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
}
break;
}
case TDMT_VND_QUERY: {
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
} else {
code = schAsyncSendMsg(job, task, TDMT_VND_RES_READY);
code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY);
if (code) {
goto _task_error;
}
}
break;
}
case TDMT_VND_RES_READY: {
case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rsp->code != TSDB_CODE_SUCCESS) {
......@@ -621,7 +497,7 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
}
break;
}
case TDMT_VND_FETCH: {
case TDMT_VND_FETCH_RSP: {
SCH_ERR_JRET(rspCode);
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
......@@ -631,6 +507,9 @@ int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
}
case TDMT_VND_DROP_TASK: {
}
default:
qError("unknown msg type:%d received", msgType);
return TSDB_CODE_QRY_INVALID_INPUT;
......@@ -648,6 +527,211 @@ _return:
}
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode);
_return:
tfree(param);
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
}
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_SUBMIT:
*fp = schHandleSubmitCallback;
break;
case TDMT_VND_QUERY:
*fp = schHandleQueryCallback;
break;
case TDMT_VND_RES_READY:
*fp = schHandleReadyCallback;
break;
case TDMT_VND_FETCH:
*fp = schHandleFetchCallback;
break;
case TDMT_VND_DROP_TASK:
*fp = schHandleDropCallback;
break;
default:
qError("unknown msg type:%d", msgType);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
__async_send_cb_fn_t fp = NULL;
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
param->queryId = qId;
param->taskId = tId;
pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo));
return TSDB_CODE_SUCCESS;
_return:
tfree(param);
tfree(pMsgSendInfo);
SCH_RET(code);
}
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
switch (msgType) {
case TDMT_VND_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = task->msgLen;
msg = task->msg;
break;
}
case TDMT_VND_QUERY: {
if (NULL == task->msg) {
qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = sizeof(SSubQueryMsg) + task->msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(task->msgLen);
memcpy(pMsg->msg, task->msg, task->msgLen);
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_FETCH: {
if (NULL == task) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
msgSize = sizeof(SResFetchMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
default:
qError("unknown msg type:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize));
return TSDB_CODE_SUCCESS;
_return:
tfree(msg);
SCH_RET(code);
}
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
......@@ -664,7 +748,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
SCH_ERR_RET(schAsyncSendMsg(job, task, msgType));
SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task));
......@@ -673,6 +757,8 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJob(SSchJob *job) {
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) {
......@@ -690,7 +776,7 @@ void schDropJobAllTasks(SSchJob *job) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK);
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter);
}
......@@ -699,7 +785,7 @@ void schDropJobAllTasks(SSchJob *job) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
schAsyncSendMsg(job, task, TDMT_VND_DROP_TASK);
schBuildAndSendMsg(job, task, TDMT_VND_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter);
}
......
......@@ -36,7 +36,7 @@
namespace {
extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
extern "C" int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtBuildQueryDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000001;
......@@ -182,7 +182,7 @@ void *schtSendRsp(void *param) {
SShellSubmitRspMsg rsp = {0};
rsp.affectedRows = 10;
schHandleRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
pIter = taosHashIterate(job->execTasks, pIter);
}
......@@ -227,7 +227,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
......@@ -238,7 +238,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
......@@ -249,7 +249,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
code = schProcessRspMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
......@@ -260,7 +260,7 @@ TEST(queryTest, normalCase) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
code = schProcessRspMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(job->execTasks, pIter);
......@@ -269,7 +269,7 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp rsp = {0};
rsp.completed = 1;
rsp.numOfRows = 10;
code = schHandleRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
code = schProcessRspMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册