提交 76b8abe2 编写于 作者: D dapan1121

feature/qnode

上级 c14b2463
......@@ -66,7 +66,6 @@ void schFreeTask(SSchTask* pTask) {
taosArrayDestroy(pTask->candidateAddrs);
}
// TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE
tfree(pTask->msg);
if (pTask->children) {
......@@ -97,7 +96,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -868,8 +867,18 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
if (pJob->res) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res);
tfree(rsp);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->res, rsp);
atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows);
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
}
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
......@@ -1067,7 +1076,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_CREATE_TABLE:
case TDMT_VND_SUBMIT: {
msgSize = pTask->msgLen;
msg = pTask->msg;
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
memcpy(msg, pTask->msg, msgSize);
break;
}
......@@ -1549,29 +1564,24 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob));
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) {
SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch));
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
*pData = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL);
SCH_JOB_ELOG("job failed or dropping, status:%d", status);
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) {
*pData = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL);
SCH_JOB_ELOG("job already succeed, status:%d", status);
goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
......@@ -1582,14 +1592,16 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
status = SCH_GET_JOB_STATUS(pJob);
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {
code = atomic_load_32(&pJob->errCode);
SCH_ERR_JRET(code);
SCH_JOB_ELOG("job failed or dropping, status:%d", status);
SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
}
if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) {
SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED));
}
_return:
while (true) {
*pData = atomic_load_ptr(&pJob->res);
......@@ -1600,10 +1612,19 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
break;
}
_return:
if (NULL == *pData) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
if (rsp) {
rsp->completed = 1;
}
*pData = rsp;
}
atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0);
SCH_JOB_DLOG("fetch done, code:%x", code);
atomic_sub_fetch_32(&pJob->ref, 1);
SCH_RET(code);
......@@ -1683,6 +1704,7 @@ void scheduleFreeJob(void *job) {
taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
tfree(pJob->res);
......
......@@ -49,7 +49,7 @@ uint64_t schtQueryId = 1;
bool schtTestStop = false;
bool schtTestDeadLoop = false;
int32_t schtTestMTRunSec = 60;
int32_t schtTestMTRunSec = 10;
int32_t schtTestPrintNum = 1000;
int32_t schtStartFetch = 0;
......@@ -187,8 +187,6 @@ void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int6
}
void schtSetPlanToString() {
static Stub stub;
stub.set(qSubPlanToString, schtPlanToString);
......@@ -228,7 +226,12 @@ void schtSetRpcSendRequest() {
}
}
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
if (pInfo) {
tfree(pInfo->param);
tfree(pInfo->msgInfo.pData);
free(pInfo);
}
return 0;
}
......@@ -284,7 +287,7 @@ void *schtCreateFetchRspThread(void *param) {
rsp->completed = 1;
rsp->numOfRows = 10;
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0);
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0);
assert(code == 0);
}
......@@ -344,12 +347,6 @@ void* schtRunJobThread(void *aa) {
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL);
assert(code == 0);
......@@ -368,6 +365,13 @@ void* schtRunJobThread(void *aa) {
while (!schtTestStop) {
schtBuildQueryDag(&dag);
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job);
assert(code == 0);
......@@ -475,8 +479,6 @@ void* schtRunJobThread(void *aa) {
code = scheduleFetchRows(pQueryJob, &data);
assert(code == 0 || code);
assert(data == (void*)NULL);
schtFreeQueryJob(0);
taosHashCleanup(execTasks);
......@@ -496,7 +498,7 @@ void* schtRunJobThread(void *aa) {
void* schtFreeJobThread(void *aa) {
while (!schtTestStop) {
usleep(rand() % 2000);
usleep(rand() % 100);
schtFreeQueryJob(1);
}
}
......@@ -592,11 +594,12 @@ TEST(queryTest, normalCase) {
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
ASSERT_EQ(pRsp->completed, 1);
ASSERT_EQ(pRsp->numOfRows, 10);
tfree(data);
data = NULL;
code = scheduleFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_EQ(data, (void*)NULL);
ASSERT_TRUE(data);
scheduleFreeJob(pJob);
......@@ -607,7 +610,6 @@ TEST(queryTest, normalCase) {
TEST(insertTest, normalCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
......@@ -672,7 +674,6 @@ TEST(multiThread, forceFree) {
sleep(3);
}
int main(int argc, char** argv) {
srand(time(NULL));
testing::InitGoogleTest(&argc, argv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册