未验证 提交 0ad6af6c 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #14873 from taosdata/enh/stopquery_a

enh: enhance stop query processing
...@@ -181,6 +181,7 @@ typedef struct SRequestSendRecvBody { ...@@ -181,6 +181,7 @@ typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now tsem_t rspSem; // not used now
__taos_async_fn_t queryFp; __taos_async_fn_t queryFp;
__taos_async_fn_t fetchFp; __taos_async_fn_t fetchFp;
EQueryExecMode execMode;
void* param; void* param;
SDataBuf requestMsg; SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG. int64_t queryJob; // query job, created according to sql query DAG.
......
...@@ -153,7 +153,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, ...@@ -153,7 +153,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
*pRequest = createRequest(connId, TSDB_SQL_SELECT); *pRequest = createRequest(connId, TSDB_SQL_SELECT);
if (*pRequest == NULL) { if (*pRequest == NULL) {
tscError("failed to malloc sqlObj, %s", sql); tscError("failed to malloc sqlObj, %s", sql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return terrno;
} }
(*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1); (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
...@@ -933,6 +933,8 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali ...@@ -933,6 +933,8 @@ SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool vali
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) { void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) {
int32_t code = 0; int32_t code = 0;
pRequest->body.execMode = pQuery->execMode;
switch (pQuery->execMode) { switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL: case QUERY_EXEC_MODE_LOCAL:
asyncExecLocalCmd(pRequest, pQuery); asyncExecLocalCmd(pRequest, pQuery);
...@@ -1149,7 +1151,6 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -1149,7 +1151,6 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT); SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
if (pRequest == NULL) { if (pRequest == NULL) {
destroyTscObj(pTscObj); destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
......
...@@ -49,7 +49,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) { ...@@ -49,7 +49,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
} }
// this function may be called by user or system, or by both simultaneously. // this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) { void taos_cleanup(void) {
tscInfo("start to cleanup client environment"); tscDebug("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return; return;
} }
...@@ -58,7 +58,10 @@ void taos_cleanup(void) { ...@@ -58,7 +58,10 @@ void taos_cleanup(void) {
clientReqRefPool = -1; clientReqRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
cleanupTaskQueue(); hbMgrCleanUp();
catalogDestroy();
schedulerDestroy();
fmFuncMgtDestroy(); fmFuncMgtDestroy();
qCleanupKeywordsTable(); qCleanupKeywordsTable();
...@@ -67,12 +70,11 @@ void taos_cleanup(void) { ...@@ -67,12 +70,11 @@ void taos_cleanup(void) {
clientConnRefPool = -1; clientConnRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
hbMgrCleanUp(); rpcCleanup();
tscDebug("rpc cleanup");
catalogDestroy(); cleanupTaskQueue();
schedulerDestroy();
rpcCleanup();
tscInfo("all local resources released"); tscInfo("all local resources released");
taosCleanupCfg(); taosCleanupCfg();
taosCloseLog(); taosCloseLog();
...@@ -852,27 +854,24 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -852,27 +854,24 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
} }
// all data has returned to App already, no need to try again // all data has returned to App already, no need to try again
if (pResultInfo->completed && (pRequest->body.queryJob != 0)) { if (pResultInfo->completed) {
pResultInfo->numOfRows = 0; // it is a local executed query, no need to do async fetch
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows); if (QUERY_EXEC_MODE_LOCAL == pRequest->body.execMode) {
return; ASSERT(pResultInfo->numOfRows >= 0);
} if (pResultInfo->localResultFetched) {
pResultInfo->numOfRows = 0;
// it is a local executed query, no need to do async fetch pResultInfo->current = 0;
if (pRequest->body.queryJob == 0) { } else {
ASSERT(pResultInfo->completed && pResultInfo->numOfRows >= 0); pResultInfo->localResultFetched = true;
if (pResultInfo->localResultFetched) { }
pResultInfo->numOfRows = 0;
pResultInfo->current = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
} else { } else {
pResultInfo->localResultFetched = true; pResultInfo->numOfRows = 0;
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
} }
pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
return; return;
} }
SSchedulerReq req = { SSchedulerReq req = {
.syncReq = false, .syncReq = false,
.fetchFp = fetchCallback, .fetchFp = fetchCallback,
......
...@@ -1293,7 +1293,7 @@ void catalogDestroy(void) { ...@@ -1293,7 +1293,7 @@ void catalogDestroy(void) {
if (!taosCheckCurrentInDll()) { if (!taosCheckCurrentInDll()) {
ctgClearCacheEnqueue(NULL, true, true, true); ctgClearCacheEnqueue(NULL, true, true, true);
taosThreadJoin(gCtgMgmt.updateThread, NULL); taosThreadJoin(gCtgMgmt.updateThread, NULL);
} }
taosHashCleanup(gCtgMgmt.pCluster); taosHashCleanup(gCtgMgmt.pCluster);
......
...@@ -223,6 +223,7 @@ typedef struct SSchJobAttr { ...@@ -223,6 +223,7 @@ typedef struct SSchJobAttr {
typedef struct { typedef struct {
int32_t op; int32_t op;
SRWLatch lock;
bool syncReq; bool syncReq;
} SSchOpStatus; } SSchOpStatus;
...@@ -473,6 +474,7 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas ...@@ -473,6 +474,7 @@ int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTas
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync);
extern SSchDebug gSCHDebug; extern SSchDebug gSCHDebug;
......
...@@ -443,25 +443,37 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) { ...@@ -443,25 +443,37 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) {
} }
void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL == pJob->opStatus.op) { if (SCH_OP_NULL == pJob->opStatus.op) {
SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status)); SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status));
return; goto _return;
} }
if (op && pJob->opStatus.op != op) { if (op && pJob->opStatus.op != op) {
SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op)); SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op));
return; goto _return;
} }
if (SCH_JOB_IN_SYNC_OP(pJob)) { if (SCH_JOB_IN_SYNC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
tsem_post(&pJob->rspSem); tsem_post(&pJob->rspSem);
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) { } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserExecRes(pJob); schNotifyUserExecRes(pJob);
} else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) { } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserFetchRes(pJob); schNotifyUserFetchRes(pJob);
} else { } else {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
} }
return;
_return:
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
} }
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
...@@ -658,13 +670,13 @@ int32_t schJobFetchRows(SSchJob *pJob) { ...@@ -658,13 +670,13 @@ int32_t schJobFetchRows(SSchJob *pJob) {
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_RET(schLaunchFetchTask(pJob)); SCH_ERR_RET(schLaunchFetchTask(pJob));
if (pJob->opStatus.syncReq) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem); tsem_wait(&pJob->rspSem);
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} }
} else { } else {
if (pJob->opStatus.syncReq) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} else { } else {
schPostJobRes(pJob, SCH_OP_FETCH); schPostJobRes(pJob, SCH_OP_FETCH);
...@@ -775,25 +787,37 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) { ...@@ -775,25 +787,37 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) {
} }
} }
bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync) {
SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
bool r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync);
SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock);
return r;
}
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t op = 0; int32_t op = 0;
switch (type) { switch (type) {
case SCH_OP_EXEC: case SCH_OP_EXEC:
if (pReq && pReq->syncReq) { if (pReq && pReq->syncReq) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) { if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
} }
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDumpJobExecRes(pJob, pReq->pExecRes); schDumpJobExecRes(pJob, pReq->pExecRes);
} }
break; break;
case SCH_OP_FETCH: case SCH_OP_FETCH:
if (pReq && pReq->syncReq) { if (pReq && pReq->syncReq) {
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) { if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
} }
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
} }
break; break;
case SCH_OP_GET_STATUS: case SCH_OP_GET_STATUS:
...@@ -816,8 +840,10 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -816,8 +840,10 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
switch (type) { switch (type) {
case SCH_OP_EXEC: case SCH_OP_EXEC:
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR); schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
...@@ -825,10 +851,13 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -825,10 +851,13 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = pReq->syncReq; pJob->opStatus.syncReq = pReq->syncReq;
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
break; break;
case SCH_OP_FETCH: case SCH_OP_FETCH:
SCH_LOCK(SCH_WRITE, &pJob->opStatus.lock);
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR); schDirectPostJobRes(pReq, TSDB_CODE_TSC_APP_ERROR);
SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR); SCH_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
} }
...@@ -840,6 +869,7 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq ...@@ -840,6 +869,7 @@ int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
pJob->userRes.cbParam = pReq->cbParam; pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.syncReq = pReq->syncReq; pJob->opStatus.syncReq = pReq->syncReq;
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
if (!SCH_JOB_NEED_FETCH(pJob)) { if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
......
...@@ -505,6 +505,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -505,6 +505,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
/*
if (SCH_IS_DATA_BIND_TASK(pTask)) { if (SCH_IS_DATA_BIND_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false; *needRetry = false;
...@@ -522,6 +523,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo ...@@ -522,6 +523,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
*/
*needRetry = true; *needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode)); SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
......
...@@ -431,7 +431,7 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) { ...@@ -431,7 +431,7 @@ static int32_t taosDecRefCount(int32_t rsetId, int64_t rid, int32_t remove) {
} }
released = 1; released = 1;
} else { } else {
uTrace("rsetId:%d p:%p rid:%" PRId64 " is released", rsetId, pNode->p, rid); uTrace("rsetId:%d p:%p rid:%" PRId64 " is released, remain count %d", rsetId, pNode->p, rid, pNode->count);
} }
} else { } else {
uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid); uTrace("rsetId:%d rid:%" PRId64 " is not there, failed to release/remove", rsetId, rid);
......
...@@ -129,7 +129,7 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -129,7 +129,7 @@ void *taosProcessSchedQueue(void *scheduler) {
while (1) { while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
if (pSched->stop) { if (pSched->stop) {
break; break;
...@@ -137,7 +137,7 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -137,7 +137,7 @@ void *taosProcessSchedQueue(void *scheduler) {
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) { if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
msg = pSched->queue[pSched->fullSlot]; msg = pSched->queue[pSched->fullSlot];
...@@ -146,12 +146,12 @@ void *taosProcessSchedQueue(void *scheduler) { ...@@ -146,12 +146,12 @@ void *taosProcessSchedQueue(void *scheduler) {
if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) { if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
if ((ret = tsem_post(&pSched->emptySem)) != 0) { if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno)); uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
if (msg.fp) if (msg.fp)
...@@ -174,12 +174,12 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { ...@@ -174,12 +174,12 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if ((ret = tsem_wait(&pSched->emptySem)) != 0) { if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) { if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
pSched->queue[pSched->emptySlot] = *pMsg; pSched->queue[pSched->emptySlot] = *pMsg;
...@@ -187,12 +187,12 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { ...@@ -187,12 +187,12 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) { if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
if ((ret = tsem_post(&pSched->fullSem)) != 0) { if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno)); uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret); ASSERT(0);
} }
} }
...@@ -200,6 +200,8 @@ void taosCleanUpScheduler(void *param) { ...@@ -200,6 +200,8 @@ void taosCleanUpScheduler(void *param) {
SSchedQueue *pSched = (SSchedQueue *)param; SSchedQueue *pSched = (SSchedQueue *)param;
if (pSched == NULL) return; if (pSched == NULL) return;
uDebug("start to cleanup %s schedQsueue", pSched->label);
pSched->stop = true; pSched->stop = true;
for (int32_t i = 0; i < pSched->numOfThreads; ++i) { for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) { if (taosCheckPthreadValid(pSched->qthread[i])) {
......
...@@ -36,7 +36,7 @@ int64_t st, et; ...@@ -36,7 +36,7 @@ int64_t st, et;
char hostName[128]; char hostName[128];
char dbName[128]; char dbName[128];
char tbName[128]; char tbName[128];
int32_t runTimes = 10000; int32_t runTimes = 1000;
typedef struct { typedef struct {
int id; int id;
...@@ -85,9 +85,12 @@ static void sqExecSQLE(TAOS *taos, char *command) { ...@@ -85,9 +85,12 @@ static void sqExecSQLE(TAOS *taos, char *command) {
taos_free_result(pSql); taos_free_result(pSql);
} }
void sqError(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg);
}
void sqExit(char* prefix, const char* errMsg) { void sqExit(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg); sqError(prefix, errMsg);
exit(1); exit(1);
} }
...@@ -141,16 +144,20 @@ void sqCloseFetchCb(void *param, TAOS_RES *pRes, int numOfRows) { ...@@ -141,16 +144,20 @@ void sqCloseFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
taos_close(qParam->taos); taos_close(qParam->taos);
*qParam->end = 1; *qParam->end = 1;
taos_free_result(pRes);
} }
void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) { void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param; SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (code == 0 && pRes) { if (code == 0 && pRes) {
if (qParam->fetch) { if (qParam->fetch) {
taos_fetch_rows_a(pRes, sqFreeFetchCb, param); taos_fetch_rows_a(pRes, sqCloseFetchCb, param);
} else { } else {
taos_close(qParam->taos); taos_close(qParam->taos);
*qParam->end = 1; *qParam->end = 1;
taos_free_result(pRes);
} }
} else { } else {
sqExit("select", taos_errstr(pRes)); sqExit("select", taos_errstr(pRes));
...@@ -203,7 +210,9 @@ void sqAsyncQueryCb(void *param, TAOS_RES *pRes, int code) { ...@@ -203,7 +210,9 @@ void sqAsyncQueryCb(void *param, TAOS_RES *pRes, int code) {
*qParam->end = 1; *qParam->end = 1;
} }
} else { } else {
sqExit("select", taos_errstr(pRes)); sqError("select", taos_errstr(pRes));
*qParam->end = 1;
taos_free_result(pRes);
} }
} }
...@@ -358,6 +367,7 @@ int sqCloseSyncQuery(bool fetch) { ...@@ -358,6 +367,7 @@ int sqCloseSyncQuery(bool fetch) {
} }
taos_close(taos); taos_close(taos);
taos_free_result(pRes);
} }
CASE_LEAVE(); CASE_LEAVE();
} }
...@@ -382,7 +392,7 @@ int sqCloseAsyncQuery(bool fetch) { ...@@ -382,7 +392,7 @@ int sqCloseAsyncQuery(bool fetch) {
SSP_CB_PARAM param = {0}; SSP_CB_PARAM param = {0};
param.fetch = fetch; param.fetch = fetch;
param.end = &qEnd; param.end = &qEnd;
taos_query_a(taos, sql, sqFreeQueryCb, &param); taos_query_a(taos, sql, sqCloseQueryCb, &param);
while (0 == qEnd) { while (0 == qEnd) {
usleep(5000); usleep(5000);
} }
...@@ -457,8 +467,6 @@ void *closeThreadFp(void *arg) { ...@@ -457,8 +467,6 @@ void *closeThreadFp(void *arg) {
} }
} }
void *killThreadFp(void *arg) { void *killThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg; SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) { while (true) {
...@@ -471,6 +479,19 @@ void *killThreadFp(void *arg) { ...@@ -471,6 +479,19 @@ void *killThreadFp(void *arg) {
} }
} }
void *cleanupThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
if (qParam->taos) {
usleep(rand() % 10000);
taos_cleanup();
break;
}
usleep(1);
}
}
int sqConCloseSyncQuery(bool fetch) { int sqConCloseSyncQuery(bool fetch) {
...@@ -578,6 +599,8 @@ int sqConKillSyncQuery(bool fetch) { ...@@ -578,6 +599,8 @@ int sqConKillSyncQuery(bool fetch) {
pthread_join(qid, NULL); pthread_join(qid, NULL);
pthread_join(cid, NULL); pthread_join(cid, NULL);
taos_close(param.taos);
} }
CASE_LEAVE(); CASE_LEAVE();
} }
...@@ -593,6 +616,40 @@ int sqConKillAsyncQuery(bool fetch) { ...@@ -593,6 +616,40 @@ int sqConKillAsyncQuery(bool fetch) {
pthread_join(qid, NULL); pthread_join(qid, NULL);
pthread_join(cid, NULL); pthread_join(cid, NULL);
taos_close(param.taos);
}
CASE_LEAVE();
}
int sqConCleanupSyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, syncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, cleanupThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
break;
}
CASE_LEAVE();
}
int sqConCleanupAsyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, asyncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, cleanupThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
break;
} }
CASE_LEAVE(); CASE_LEAVE();
} }
...@@ -600,7 +657,7 @@ int sqConKillAsyncQuery(bool fetch) { ...@@ -600,7 +657,7 @@ int sqConKillAsyncQuery(bool fetch) {
void sqRunAllCase(void) { void sqRunAllCase(void) {
/* #if 1
sqStopSyncQuery(false); sqStopSyncQuery(false);
sqStopSyncQuery(true); sqStopSyncQuery(true);
sqStopAsyncQuery(false); sqStopAsyncQuery(false);
...@@ -620,23 +677,26 @@ void sqRunAllCase(void) { ...@@ -620,23 +677,26 @@ void sqRunAllCase(void) {
sqConCloseSyncQuery(true); sqConCloseSyncQuery(true);
sqConCloseAsyncQuery(false); sqConCloseAsyncQuery(false);
sqConCloseAsyncQuery(true); sqConCloseAsyncQuery(true);
*/
#if 0
sqKillSyncQuery(false); sqKillSyncQuery(false);
sqKillSyncQuery(true); sqKillSyncQuery(true);
sqKillAsyncQuery(false); sqKillAsyncQuery(false);
sqKillAsyncQuery(true); sqKillAsyncQuery(true);
#endif
//sqConKillSyncQuery(false); sqConKillSyncQuery(false);
sqConKillSyncQuery(true); sqConKillSyncQuery(true);
#if 0
sqConKillAsyncQuery(false); sqConKillAsyncQuery(false);
sqConKillAsyncQuery(true); sqConKillAsyncQuery(true);
#endif #endif
/*
sqConCleanupSyncQuery(false);
sqConCleanupSyncQuery(true);
sqConCleanupAsyncQuery(false);
sqConCleanupAsyncQuery(true);
*/
int32_t l = 5; int32_t l = 5;
while (l) { while (l) {
printf("%d\n", l--); printf("%d\n", l--);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册