提交 c9d9c3ff 编写于 作者: D dapan1121

enh: stop query

上级 7d5a13b8
......@@ -132,9 +132,14 @@ void closeAllRequests(SHashObj *pRequests) {
void destroyAppInst(SAppInstInfo* pAppInfo) {
tscDebug("destroy app inst mgr %p", pAppInfo);
taosThreadMutexLock(&appInfo.mutex);
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
taosThreadMutexUnlock(&appInfo.mutex);
taosMemoryFreeClear(pAppInfo->instKey);
closeTransporter(pAppInfo);
......
......@@ -275,8 +275,11 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
taosThreadMutexLock(&appInfo.mutex);
SAppInstInfo **pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL || NULL == *pInst) {
taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key);
taosMemoryFreeClear(param);
tFreeClientHbBatchRsp(&pRsp);
......@@ -300,6 +303,8 @@ static int32_t hbAsyncCallBack(void *param, const SDataBuf *pMsg, int32_t code)
}
}
taosThreadMutexUnlock(&appInfo.mutex);
tFreeClientHbBatchRsp(&pRsp);
return code;
......
......@@ -826,8 +826,8 @@ _return:
pJob->jobResCode = code;
taosSsleep(2);
qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
//taosSsleep(2);
//qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
taosAsyncExec(ctgCallUserCb, pJob, NULL);
......
......@@ -1083,7 +1083,7 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qDebug("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......
......@@ -218,9 +218,7 @@ typedef struct SSchJob {
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *taskList;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx;
......
......@@ -85,20 +85,6 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->succTasks =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->succTasks) {
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->failTasks =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->failTasks) {
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&pJob->rspSem, 0, 0);
refId = taosAddRef(schMgmt.jobRef, pJob);
......@@ -724,6 +710,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
......@@ -801,6 +788,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
return TSDB_CODE_SUCCESS;
}
*/
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
int8_t status = 0;
......@@ -1047,9 +1035,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved));
} else {
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) {
SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
......@@ -1115,8 +1101,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_LOG_TASK_END_TS(pTask);
SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED);
SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
......@@ -1150,8 +1134,6 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
pJob->fetchTask = pTask;
SCH_ERR_JRET(schMoveTaskToExecList(pJob, pTask, &moved));
SCH_RET(schProcessOnJobPartialSuccess(pJob));
}
......@@ -1466,8 +1448,8 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->execTasks);
schDropTaskInHashList(pJob, pJob->succTasks);
schDropTaskInHashList(pJob, pJob->failTasks);
// schDropTaskInHashList(pJob, pJob->succTasks);
// schDropTaskInHashList(pJob, pJob->failTasks);
}
int32_t schCancelJob(SSchJob *pJob) {
......@@ -1507,8 +1489,8 @@ void schFreeJobImpl(void *job) {
schFreeFlowCtrl(pJob);
taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks);
// taosHashCleanup(pJob->failTasks);
// taosHashCleanup(pJob->succTasks);
taosHashCleanup(pJob->taskList);
taosArrayDestroy(pJob->levels);
......
......@@ -36,7 +36,7 @@ int64_t st, et;
char hostName[128];
char dbName[128];
char tbName[128];
int32_t runTimes = 1;
int32_t runTimes = 10000;
typedef struct {
int id;
......@@ -367,8 +367,7 @@ void *closeThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
if (qParam->taos) {
//usleep(rand() % 10000);
usleep(1000000);
usleep(rand() % 10000);
taos_close(qParam->taos);
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册