提交 7891ff0f 编写于 作者: D dapan1121

fix: fix stop query issue

上级 cfa989e6
......@@ -42,7 +42,7 @@ volatile int32_t tscInitRes = 0;
void initTscQhandle() {
// init handle
tscQhandle = taosInitScheduler(4096, 5, "tsc");
tscQhandle = taosInitScheduler(4096, 5, "tscQ");
}
void cleanupTscQhandle() {
......
......@@ -153,7 +153,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
*pRequest = createRequest(connId, TSDB_SQL_SELECT);
if (*pRequest == NULL) {
tscError("failed to malloc sqlObj, %s", sql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
return terrno;
}
(*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
......
......@@ -44,7 +44,7 @@ void taosWLockLatch(SRWLatch *pLatch) {
nLoops = 0;
while (1) {
oLatch = atomic_load_32(pLatch);
if (0 == oLatch) break;
if (oLatch == TD_RWLATCH_WRITE_FLAG) break;
nLoops++;
if (nLoops > 1000) {
sched_yield();
......
......@@ -129,7 +129,7 @@ void *taosProcessSchedQueue(void *scheduler) {
while (1) {
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
if (pSched->stop) {
break;
......@@ -137,7 +137,7 @@ void *taosProcessSchedQueue(void *scheduler) {
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
msg = pSched->queue[pSched->fullSlot];
......@@ -146,12 +146,12 @@ void *taosProcessSchedQueue(void *scheduler) {
if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
if (msg.fp)
......@@ -174,12 +174,12 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
pSched->queue[pSched->emptySlot] = *pMsg;
......@@ -187,12 +187,12 @@ void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
ASSERT(0);
}
}
......@@ -200,6 +200,8 @@ void taosCleanUpScheduler(void *param) {
SSchedQueue *pSched = (SSchedQueue *)param;
if (pSched == NULL) return;
uDebug("start to cleanup %s schedQsueue", pSched->label);
pSched->stop = true;
for (int32_t i = 0; i < pSched->numOfThreads; ++i) {
if (taosCheckPthreadValid(pSched->qthread[i])) {
......
......@@ -85,10 +85,12 @@ static void sqExecSQLE(TAOS *taos, char *command) {
taos_free_result(pSql);
}
void sqError(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg);
}
void sqExit(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg);
sleep(10000);
sqError(prefix, errMsg);
exit(1);
}
......@@ -208,7 +210,9 @@ void sqAsyncQueryCb(void *param, TAOS_RES *pRes, int code) {
*qParam->end = 1;
}
} else {
sqExit("select", taos_errstr(pRes));
sqError("select", taos_errstr(pRes));
*qParam->end = 1;
taos_free_result(pRes);
}
}
......@@ -463,8 +467,6 @@ void *closeThreadFp(void *arg) {
}
}
void *killThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
......@@ -477,6 +479,19 @@ void *killThreadFp(void *arg) {
}
}
void *cleanupThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
if (qParam->taos) {
usleep(rand() % 10000);
taos_cleanup();
break;
}
usleep(1);
}
}
int sqConCloseSyncQuery(bool fetch) {
......@@ -607,9 +622,40 @@ int sqConKillAsyncQuery(bool fetch) {
CASE_LEAVE();
}
int sqConCleanupSyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, syncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, cleanupThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
}
CASE_LEAVE();
}
int sqConCleanupAsyncQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, asyncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, cleanupThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
}
CASE_LEAVE();
}
void sqRunAllCase(void) {
#if 0
sqStopSyncQuery(false);
sqStopSyncQuery(true);
sqStopAsyncQuery(false);
......@@ -638,11 +684,16 @@ void sqRunAllCase(void) {
sqConKillSyncQuery(false);
sqConKillSyncQuery(true);
#if 0
sqConKillAsyncQuery(false);
sqConKillAsyncQuery(true);
#endif
sqConCleanupSyncQuery(false);
sqConCleanupSyncQuery(true);
sqConCleanupAsyncQuery(false);
sqConCleanupAsyncQuery(true);
int32_t l = 5;
while (l) {
printf("%d\n", l--);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册