提交 89dcffce 编写于 作者: D dapan1121

feature/qnode

上级 7850f2a9
...@@ -89,7 +89,7 @@ int32_t scheduleFetchRows(void *pJob, void **data); ...@@ -89,7 +89,7 @@ int32_t scheduleFetchRows(void *pJob, void **data);
* @param pJob * @param pJob
* @return * @return
*/ */
int32_t scheduleCancelJob(void *pJob); //int32_t scheduleCancelJob(void *pJob);
/** /**
* Free the query job * Free the query job
......
...@@ -87,24 +87,24 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m ...@@ -87,24 +87,24 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
int8_t oriStatus = SCH_GET_JOB_STATUS(pJob); int8_t oriStatus = 0;
while (true) {
oriStatus = SCH_GET_JOB_STATUS(pJob);
/*
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
switch (oriStatus) { switch (oriStatus) {
case JOB_TASK_STATUS_NULL: case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING if (newStatus != JOB_TASK_STATUS_NOT_START) {
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_NOT_START) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
break; break;
case JOB_TASK_STATUS_NOT_START: case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_CANCELLED) { if (newStatus != JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -120,9 +120,9 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { ...@@ -120,9 +120,9 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break; break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED: case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING if (newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_CANCELLED) { && newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -130,7 +130,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { ...@@ -130,7 +130,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
case JOB_TASK_STATUS_SUCCEED: case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_FAILED: case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_CANCELLING: case JOB_TASK_STATUS_CANCELLING:
if (newStatus != JOB_TASK_STATUS_CANCELLED) { if (newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -141,14 +141,18 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { ...@@ -141,14 +141,18 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break; break;
default: default:
qError("invalid task status:%d", oriStatus); SCH_JOB_ELOG("invalid job status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR; SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
*/
SCH_SET_JOB_STATUS(pJob, newStatus); if (oriStatus != atomic_val_compare_exchange_8(&pJob->status, oriStatus, newStatus)) {
continue;
}
SCH_JOB_DLOG("status updated from %d to %d", oriStatus, newStatus); SCH_JOB_DLOG("job status updated from %d to %d", oriStatus, newStatus);
break;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -507,6 +511,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b ...@@ -507,6 +511,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
} }
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) { int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
...@@ -515,7 +520,13 @@ int32_t schFetchFromRemote(SSchJob *pJob) { ...@@ -515,7 +520,13 @@ int32_t schFetchFromRemote(SSchJob *pJob) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (atomic_load_ptr(&pJob->res)) void *res = atomic_load_ptr(&pJob->res);
if (res) {
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_JOB_DLOG("res already fetched, res:%p", res);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH));
...@@ -525,12 +536,15 @@ _return: ...@@ -525,12 +536,15 @@ _return:
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
schProcessOnJobFailure(pJob, code);
return code; return code;
} }
// Note: no more error processing, handled in function internal // Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED));
if (errCode) { if (errCode) {
...@@ -813,7 +827,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -813,7 +827,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) { if (NULL == job || NULL == (*job)) {
qError("QID:%"PRIx64" taosHashGet queryId not exist", pParam->queryId); qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
...@@ -1147,7 +1161,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -1147,7 +1161,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs);
if (size <= 0) { if (size <= 0) {
SCH_TASK_DLOG("empty exec address, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask));
return; return;
} }
...@@ -1157,6 +1171,8 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -1157,6 +1171,8 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) {
schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK); schBuildAndSendMsg(pJob, pTask, addr, TDMT_VND_DROP_TASK);
} }
SCH_TASK_DLOG("task has %d exec address", size);
} }
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
...@@ -1331,6 +1347,12 @@ int32_t scheduleFetchRows(void *job, void **data) { ...@@ -1331,6 +1347,12 @@ int32_t scheduleFetchRows(void *job, void **data) {
SSchJob *pJob = job; SSchJob *pJob = job;
int32_t code = 0; int32_t code = 0;
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_DROPPING) {
SCH_JOB_ELOG("job is dropping, status:%d", status);
return TSDB_CODE_SCH_STATUS_ERROR;
}
atomic_add_fetch_32(&pJob->ref, 1); atomic_add_fetch_32(&pJob->ref, 1);
if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { if (!SCH_JOB_NEED_FETCH(&pJob->attr)) {
...@@ -1345,8 +1367,6 @@ int32_t scheduleFetchRows(void *job, void **data) { ...@@ -1345,8 +1367,6 @@ int32_t scheduleFetchRows(void *job, void **data) {
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
int8_t status = SCH_GET_JOB_STATUS(pJob);
if (status == JOB_TASK_STATUS_FAILED) { if (status == JOB_TASK_STATUS_FAILED) {
*data = atomic_load_ptr(&pJob->res); *data = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL); atomic_store_ptr(&pJob->res, NULL);
...@@ -1414,6 +1434,10 @@ void scheduleFreeJob(void *job) { ...@@ -1414,6 +1434,10 @@ void scheduleFreeJob(void *job) {
return; return;
} }
schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) { while (true) {
int32_t ref = atomic_load_32(&pJob->ref); int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) { if (0 == ref) {
...@@ -1425,6 +1449,8 @@ void scheduleFreeJob(void *job) { ...@@ -1425,6 +1449,8 @@ void scheduleFreeJob(void *job) {
} }
} }
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) { if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob); schCancelJob(pJob);
} }
......
...@@ -38,52 +38,73 @@ namespace { ...@@ -38,52 +38,73 @@ namespace {
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
void schtInitLogFile() {
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
void schtBuildQueryDag(SQueryDag *dag) { void schtBuildQueryDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000001; uint64_t qId = 0x0000000000000001;
dag->queryId = qId; dag->queryId = qId;
dag->numOfSubplans = 2; dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SArray *scan = taosArrayInit(1, sizeof(SSubplan)); SArray *scan = taosArrayInit(1, POINTER_BYTES);
SArray *merge = taosArrayInit(1, sizeof(SSubplan)); SArray *merge = taosArrayInit(1, POINTER_BYTES);
SSubplan scanPlan = {0}; SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
SSubplan mergePlan = {0}; SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));
scanPlan.id.queryId = qId; scanPlan->id.queryId = qId;
scanPlan.id.templateId = 0x0000000000000002; scanPlan->id.templateId = 0x0000000000000002;
scanPlan.id.subplanId = 0x0000000000000003; scanPlan->id.subplanId = 0x0000000000000003;
scanPlan.type = QUERY_TYPE_SCAN; scanPlan->type = QUERY_TYPE_SCAN;
scanPlan.execNode.numOfEps = 1; scanPlan->execNode.numOfEps = 1;
scanPlan.execNode.nodeId = 1; scanPlan->execNode.nodeId = 1;
scanPlan.execNode.inUse = 0; scanPlan->execNode.inUse = 0;
scanPlan.execNode.epAddr[0].port = 6030; scanPlan->execNode.epAddr[0].port = 6030;
strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0"); strcpy(scanPlan->execNode.epAddr[0].fqdn, "ep0");
scanPlan.pChildren = NULL; scanPlan->pChildren = NULL;
scanPlan.level = 1; scanPlan->level = 1;
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
mergePlan.id.queryId = qId; mergePlan->id.queryId = qId;
mergePlan.id.templateId = 0x4444444444; mergePlan->id.templateId = 0x4444444444;
mergePlan.id.subplanId = 0x5555555555; mergePlan->id.subplanId = 0x5555555555;
mergePlan.type = QUERY_TYPE_MERGE; mergePlan->type = QUERY_TYPE_MERGE;
mergePlan.level = 0; mergePlan->level = 0;
mergePlan.execNode.numOfEps = 0; mergePlan->execNode.numOfEps = 0;
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan.pParents = NULL; mergePlan->pParents = NULL;
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
taosArrayPush(mergePointer->pChildren, &scanPointer); taosArrayPush(mergePlan->pChildren, &scanPlan);
taosArrayPush(scanPointer->pParents, &mergePointer); taosArrayPush(scanPlan->pParents, &mergePlan);
taosArrayPush(dag->pSubplans, &merge); taosArrayPush(dag->pSubplans, &merge);
taosArrayPush(dag->pSubplans, &scan); taosArrayPush(dag->pSubplans, &scan);
} }
void schtFreeQueryDag(SQueryDag *dag) {
}
void schtBuildInsertDag(SQueryDag *dag) { void schtBuildInsertDag(SQueryDag *dag) {
uint64_t qId = 0x0000000000000002; uint64_t qId = 0x0000000000000002;
...@@ -138,8 +159,8 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { ...@@ -138,8 +159,8 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
return 0; return 0;
} }
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return 0;
} }
...@@ -209,6 +230,9 @@ TEST(queryTest, normalCase) { ...@@ -209,6 +230,9 @@ TEST(queryTest, normalCase) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
void *pJob = NULL; void *pJob = NULL;
SQueryDag dag = {0}; SQueryDag dag = {0};
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0}; SEpAddr qnodeAddr = {0};
...@@ -295,6 +319,8 @@ TEST(queryTest, normalCase) { ...@@ -295,6 +319,8 @@ TEST(queryTest, normalCase) {
ASSERT_EQ(data, (void*)NULL); ASSERT_EQ(data, (void*)NULL);
scheduleFreeJob(pJob); scheduleFreeJob(pJob);
schtFreeQueryDag(&dag);
} }
...@@ -308,6 +334,9 @@ TEST(insertTest, normalCase) { ...@@ -308,6 +334,9 @@ TEST(insertTest, normalCase) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SQueryDag dag = {0}; SQueryDag dag = {0};
uint64_t numOfRows = 0; uint64_t numOfRows = 0;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
SEpAddr qnodeAddr = {0}; SEpAddr qnodeAddr = {0};
...@@ -336,7 +365,11 @@ TEST(insertTest, normalCase) { ...@@ -336,7 +365,11 @@ TEST(insertTest, normalCase) {
scheduleFreeJob(pInsertJob); scheduleFreeJob(pInsertJob);
} }
TEST(multiThread, forceFree) {
schtInitLogFile();
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -1087,7 +1087,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { ...@@ -1087,7 +1087,7 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
return 0; return 0;
} }
return -1; return 0;
} }
char *taosGetCmdlineByPID(int pid) { char *taosGetCmdlineByPID(int pid) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册