diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b6ab3702c939ffc01e2d0a2c2c5cce8ddf0f2a8f..8621f9d28bc90b22bb54bcdb8585c3db7a1bf429 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -80,6 +80,8 @@ enum { DATA_FROM_DATA_FILE = 2, }; +typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); + typedef struct STableComInfo { uint8_t numOfTags; uint8_t precision; @@ -226,7 +228,7 @@ typedef struct STableDataBlocks { typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. uint32_t type; // query/insert type - // TODO refactor + STimeWindow window; // query time window SInterval interval; @@ -440,19 +442,20 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql); * @param pObj */ void tscFreeSqlObj(SSqlObj *pSql); - -void tscFreeSqlObjInCache(void *pSql); +void tscFreeRegisteredSqlObj(void *pSql); void tscCloseTscObj(STscObj *pObj); +// todo move to taos? or create a new file: taos_internal.h TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos); -void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; +TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res); -void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); +void waitForQueryRsp(void *param, TAOS_RES *tres, int code); + +void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen); void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); -void tscKillSTableQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); @@ -517,8 +520,6 @@ extern SRpcCorEpSet tscMgmtEpSet; extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); -typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); - int32_t tscCompareTidTags(const void* p1, const void* p2); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a76b77bb865dfd88ff35406ac5a4d2538e35d7d8..639de294e623d7f3266eb716a40ff67a741e3707 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) { +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd* pCmd = &pSql->cmd; pSql->signature = pSql; diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index bcd52d3a4295ebaa5d84b322de9809e10ab23901..bae0f91dcc40e78c3d2f707ff93ee6ebd5322339 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -151,10 +151,12 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) { pthread_mutex_unlock(&pObj->mutex); - if (pSql == NULL) return; - - tscDebug("%p query is killed, queryId:%d", pSql, killId); - taos_stop_query(pSql); + if (pSql == NULL) { + tscError("failed to kill query, id:%d, it may have completed/terminated", killId); + } else { + tscDebug("%p query is killed, queryId:%d", pSql, killId); + taos_stop_query(pSql); + } } void tscAddIntoStreamList(SSqlStream *pStream) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1161bae424485ad67cb593c21706f3d69a2f6252..494a8a9c301175352c399317c83324f364a31eb6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -467,45 +467,6 @@ int tscProcessSql(SSqlObj *pSql) { return doProcessSql(pSql); } -void tscKillSTableQuery(SSqlObj *pSql) { - SSqlCmd* pCmd = &pSql->cmd; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { - return; - } - - // set the master sqlObj flag to cancel query - pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - - for (int i = 0; i < pSql->subState.numOfSub; ++i) { - // NOTE: pSub may have been released already here - SSqlObj *pSub = pSql->pSubs[i]; - if (pSub == NULL) { - continue; - } - - void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); - if (p == NULL) { - continue; - } - - SSqlObj* pSubObj = (SSqlObj*) (*p); - assert(pSubObj->self == (SSqlObj**) p); - - pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; - if (pSubObj->pRpcCtx != NULL) { - rpcCancelRequest(pSubObj->pRpcCtx); - pSubObj->pRpcCtx = NULL; - } - -// tscQueueAsyncRes(pSubObj); // async res? not other functions? - taosCacheRelease(tscObjCache, (void**) &p, false); - } - - tscDebug("%p super table query cancelled", pSql); -} - int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index f73b4abb159852a4db1e8ff52e07d0965b83c821..e7369e002b5de1a3df02285e2b8862fbb09633a8 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -307,7 +307,7 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) { tsem_post(&pSql->rspSem); } -TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { +TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen, TAOS_RES** res) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { terrno = TSDB_CODE_TSC_DISCONNECTED; @@ -332,12 +332,20 @@ TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) { tsem_init(&pSql->rspSem, 0, 0); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); + if (res != NULL) { + *res = pSql; + } + tsem_wait(&pSql->rspSem); return pSql; } TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { - return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr)); + return taos_query_c(taos, sqlstr, (uint32_t)strlen(sqlstr), NULL); +} + +TAOS_RES* taos_query_h(TAOS* taos, const char *sqlstr, TAOS_RES** res) { + return taos_query_c(taos, sqlstr, (uint32_t) strlen(sqlstr), res); } int taos_result_precision(TAOS_RES *res) { @@ -689,6 +697,45 @@ int* taos_fetch_lengths(TAOS_RES *res) { char *taos_get_client_info() { return version; } +static void tscKillSTableQuery(SSqlObj *pSql) { + SSqlCmd* pCmd = &pSql->cmd; + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { + return; + } + + // set the master sqlObj flag to cancel query + pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + + for (int i = 0; i < pSql->subState.numOfSub; ++i) { + // NOTE: pSub may have been released already here + SSqlObj *pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE)); + if (p == NULL) { + continue; + } + + SSqlObj* pSubObj = (SSqlObj*) (*p); + assert(pSubObj->self == (SSqlObj**) p); + + pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + if (pSubObj->pRpcCtx != NULL) { + rpcCancelRequest(pSubObj->pRpcCtx); + pSubObj->pRpcCtx = NULL; + } + + tscQueueAsyncRes(pSubObj); + taosCacheRelease(tscObjCache, (void**) &p, false); + } + + tscDebug("%p super table query cancelled", pSql); +} + void taos_stop_query(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; if (pSql == NULL || pSql->signature != pSql) { @@ -698,19 +745,20 @@ void taos_stop_query(TAOS_RES *res) { tscDebug("%p start to cancel query", res); SSqlCmd *pCmd = &pSql->cmd; - // TODO there are multi-thread problem. - // It may have been released by the other thread already. - // The ref count may fix this problem. - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - // set the error code for master pSqlObj firstly pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { assert(pSql->pRpcCtx == NULL); tscKillSTableQuery(pSql); } else { if (pSql->cmd.command < TSDB_SQL_LOCAL) { + /* + * There is multi-thread problem here, since pSql->pRpcCtx may have been + * reset and freed in the processMsgFromServer function, and causes the invalid + * write problem for rpcCancelRequest. + */ if (pSql->pRpcCtx != NULL) { rpcCancelRequest(pSql->pRpcCtx); pSql->pRpcCtx = NULL; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 185d6784d1ca3457016ed8fa2c80ff63daf2b8a3..5ccb2aee8c62a8ad200115c6ca5b534f6a357275 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1563,6 +1563,11 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in } void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { + // it has been freed already + if (pSql->param != trsupport || pSql->param == NULL) { + return; + } + SSqlObj *pParentSql = trsupport->pParentSql; int32_t subqueryIndex = trsupport->subqueryIndex; @@ -1709,14 +1714,21 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p } static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { + SSqlObj *pSql = (SSqlObj *)tres; + assert(pSql != NULL); + + // this query has been freed already SRetrieveSupport *trsupport = (SRetrieveSupport *)param; + if (pSql->param == NULL || param == NULL) { + tscDebug("%p already freed in dnodecallback", pSql); + assert(pSql->res.code == TSDB_CODE_TSC_QUERY_CANCELLED); + return; + } + tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; int32_t idx = trsupport->subqueryIndex; SSqlObj * pParentSql = trsupport->pParentSql; - SSqlObj *pSql = (SSqlObj *)tres; - assert(pSql != NULL && trsupport == pSql->param); - SSubqueryState* pState = &pParentSql->subState; assert(pState->numOfRemain <= pState->numOfSub && pState->numOfRemain >= 0); diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index dd62df170a7c87f127eb7c52c1f580b7f460b445..765181dbba5c92ca9dc80adb7376e68c70237d6d 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -20,6 +20,7 @@ #include "taos.h" #include "taosdef.h" #include "stdbool.h" +#include "tsclient.h" #define MAX_USERNAME_SIZE 64 #define MAX_DBNAME_SIZE 64 diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index d5e826fbaabfbf76dfac0256935f3222b53cd70a..24388bf50c7a215920a58e79f8d33ab81752c547 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -294,9 +294,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { st = taosGetTimestampUs(); - TAOS_RES* pSql = taos_query(con, command); - atomic_store_ptr(&result, pSql); // set the global TAOS_RES pointer - + TAOS_RES* pSql = taos_query_h(con, command, &result); if (taos_errno(pSql)) { taos_error(pSql); return;