diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 910a7b41126536eb0e7838e8aaf196b64642e5f4..97b962e53acc27f5a5f5e09adf97dc9bc0ef1801 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -381,6 +381,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { taosScheduleTask(tscQhandle, &schedMsg); } + void tscQueueAsyncRes(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); @@ -390,7 +391,10 @@ void tscQueueAsyncRes(SSqlObj *pSql) { tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code)); SSqlRes *pRes = &pSql->res; - assert(pSql->fp != NULL && pSql->fetchFp != NULL); + + if (pSql->fp == NULL || pSql->fetchFp == NULL){ + return; + } pSql->fp = pSql->fetchFp; (*pSql->fp)(pSql->param, pSql, pRes->code); diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 7e5ebb059667c5fc4d530f1fab3957a962ed3fda..24156956174364ae1be6abfa2af4c3db7c0db712 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -86,6 +86,6 @@ extern void set_terminal_mode(); extern int get_old_terminal_mode(struct termios* tio); extern void reset_terminal_mode(); extern SShellArguments args; -extern TAOS_RES* result; +extern int64_t result; #endif diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 7a9e242668822cd4383050fa2dcebe248024f9e7..627d06ac2e59ec2bdb222998ea05018c038830ab 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -46,7 +46,7 @@ char CONTINUE_PROMPT[] = " -> "; int prompt_size = 6; #endif -TAOS_RES *result = NULL; +int64_t result = 0; SShellHistory history; #define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30 @@ -260,6 +260,14 @@ int32_t shellRunCommand(TAOS* con, char* command) { } +void freeResultWithRid(int64_t rid) { + SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); + if(pSql){ + taos_free_result(pSql); + taosReleaseRef(tscObjRef, rid); + } +} + void shellRunCommandOnServer(TAOS *con, char command[]) { int64_t st, et; wordexp_t full_path; @@ -294,18 +302,22 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { st = taosGetTimestampUs(); - TAOS_RES* pSql = taos_query_h(con, command, &result); + TAOS_RES* tmpSql = NULL; + TAOS_RES* pSql = taos_query_h(con, command, &tmpSql); if (taos_errno(pSql)) { taos_error(pSql, st); return; } + atomic_store_64(&result, ((SSqlObj*)tmpSql)->self); + int64_t oresult = atomic_load_64(&result); + if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { fprintf(stdout, "Database changed.\n\n"); fflush(stdout); - atomic_store_ptr(&result, 0); - taos_free_result(pSql); + atomic_store_64(&result, 0); + freeResultWithRid(oresult); return; } @@ -313,8 +325,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { int error_no = 0; int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); if (numOfRows < 0) { - atomic_store_ptr(&result, 0); - taos_free_result(pSql); + atomic_store_64(&result, 0); + freeResultWithRid(oresult); return; } @@ -336,8 +348,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { wordfree(&full_path); } - atomic_store_ptr(&result, 0); - taos_free_result(pSql); + atomic_store_64(&result, 0); + freeResultWithRid(oresult); } /* Function to do regular expression check */ @@ -501,7 +513,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* tres) { row = taos_fetch_row(tres); } while( row != NULL); - result = NULL; + result = 0; fclose(fp); return numOfRows; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 4f0c5e3f9938b2d3c5281a902e4dbc44c221bf82..041ad71ccba47cf2b84984af89d25ff15ccf16ce 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -19,15 +19,31 @@ #include "tnettest.h" pthread_t pid; +static tsem_t cancelSem; void shellQueryInterruptHandler(int signum) { + tsem_post(&cancelSem); +} + +void *cancelHandler(void *arg) { + while(1) { + if (tsem_wait(&cancelSem) != 0) { + taosMsleep(10); + continue; + } + #ifdef LINUX - void* pResHandle = atomic_val_compare_exchange_64(&result, result, 0); - taos_stop_query(pResHandle); + int64_t rid = atomic_val_compare_exchange_64(&result, result, 0); + SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); + taos_stop_query(pSql); + taosReleaseRef(tscObjRef, rid); #else - printf("\nReceive ctrl+c or other signal, quit shell.\n"); - exit(0); + printf("\nReceive ctrl+c or other signal, quit shell.\n"); + exit(0); #endif + } + + return NULL; } int checkVersion() { @@ -105,6 +121,14 @@ int main(int argc, char* argv[]) { exit(EXIT_FAILURE); } + if (tsem_init(&cancelSem, 0, 0) != 0) { + printf("failed to create cancel semphore\n"); + exit(EXIT_FAILURE); + } + + pthread_t spid; + pthread_create(&spid, NULL, cancelHandler, NULL); + /* Interrupt handler. */ struct sigaction act; memset(&act, 0, sizeof(struct sigaction));