diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 590f205e1da87d3bc434f374b6c173905a46c184..72ca96891acb2ecbab27aedf47b2e97fc4a487d6 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -270,6 +270,9 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); int tscSetMgmtEpSetFromCfg(const char *first, const char *second); +bool tscSetSqlOwner(SSqlObj* pSql); +void tscClearSqlOwner(SSqlObj* pSql); + void* malloc_throw(size_t size); void* calloc_throw(size_t nmemb, size_t size); char* strdup_throw(const char* str); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3ad4cd9455e84dda50bbc8297f37bece022d6b09..49f7cec8893cc425ab4e35b53e192ec9ee57e7e7 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -302,6 +302,7 @@ typedef struct STscObj { typedef struct SSqlObj { void *signature; + pthread_t owner; // owner of sql object, by which it is executed STscObj *pTscObj; void *pRpcCtx; void (*fp)(); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index af8e913563e74a6a86e682ec5b278820a9d00569..f01f1aa384876af2534bb60a7b5c67f3dd62e40a 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -233,48 +233,21 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { sem_post(&pSql->rspSem); } -TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { +TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } - int32_t sqlLen = strlen(sqlstr); if (sqlLen > tsMaxSQLStringLen) { tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); terrno = TSDB_CODE_TSC_INVALID_SQL; return NULL; } - + taosNotePrintTsc(sqlstr); - - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - tscError("failed to malloc sqlObj"); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return NULL; - } - - doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); - // wait for the callback function to post the semaphore - tsem_wait(&pSql->rspSem); - return pSql; -} -TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { - STscObj *pObj = (STscObj *)taos; - if (pObj == NULL || pObj->signature != pObj) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return NULL; - } - - if (sqlLen > tsMaxSQLStringLen) { - tscError("sql string exceeds max length:%d", tsMaxSQLStringLen); - terrno = TSDB_CODE_TSC_INVALID_SQL; - return NULL; - } - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { tscError("failed to malloc sqlObj"); @@ -287,6 +260,11 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { tsem_wait(&pSql->rspSem); return pSql; } + +TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { + return taos_query_c(taos, sqlstr, strlen(sqlstr)); +} + int taos_result_precision(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) return 0; @@ -422,7 +400,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { pCmd->command == TSDB_SQL_INSERT) { return NULL; } - + + // set the sql object owner + tscSetSqlOwner(pSql); + // current data set are exhausted, fetch more data from node if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || @@ -441,7 +422,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { sem_wait(&pSql->rspSem); } - return doSetResultRowData(pSql, true); + void* data = doSetResultRowData(pSql, true); + + tscClearSqlOwner(pSql); + return data; } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { @@ -509,7 +493,7 @@ int taos_select_db(TAOS *taos, const char *db) { } // send free message to vnode to free qhandle and corresponding resources in vnode -static bool tscFreeQhandleInVnode(SSqlObj* pSql) { +static bool tscKillQueryInVnode(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; @@ -557,16 +541,14 @@ void taos_free_result(TAOS_RES *res) { } pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - if (!tscFreeQhandleInVnode(pSql)) { + if (!tscKillQueryInVnode(pSql)) { tscFreeSqlObj(pSql); tscDebug("%p sqlObj is freed by app", pSql); } } -// todo should not be used in async query int taos_errno(TAOS_RES *tres) { SSqlObj *pSql = (SSqlObj *) tres; - if (pSql == NULL || pSql->signature != pSql) { return terrno; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5409d909398bfe5570aef25dd6145f078b3c81cc..fdc019e97b0b90f8848f4b0cbfb0f7ba644b820b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2223,3 +2223,21 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) { return 0; } + +bool tscSetSqlOwner(SSqlObj* pSql) { + SSqlRes* pRes = &pSql->res; + + // set the sql object owner + uint64_t threadId = taosGetPthreadId(); + if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) { + pRes->code = TSDB_CODE_QRY_IN_EXEC; + return false; + } + + return true; +} + +void tscClearSqlOwner(SSqlObj* pSql) { + assert(pSql->owner != 0); + atomic_store_64(&pSql->owner, 0); +} \ No newline at end of file