diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f5576fe37281390d85599c81b509db096ab3c99b..99b722e660bdb47c9bca23efd638e81ea3b03375 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -402,7 +402,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); int tscProcessSql(SSqlObj *pSql); int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex); -void tscQueueAsyncRes(SSqlObj *pSql); +void tscAsyncResultOnError(SSqlObj *pSql); void tscQueueAsyncError(void(*fp), void *param, int32_t code); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 54017de8aee6a0402523103cc1e48e2b6254af72..96aeb9d60de1ab6fbaeebcb54e2da1ab316179f8 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -56,7 +56,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -70,7 +70,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para if (code != TSDB_CODE_SUCCESS) { pSql->res.code = code; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -165,7 +165,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo pRes->code = numOfRows; } - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -216,7 +216,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; pSql->param = param; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -279,7 +279,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), pSql->param = param; pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -381,7 +381,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { } -void tscQueueAsyncRes(SSqlObj *pSql) { +void tscAsyncResultOnError(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); return; @@ -531,6 +531,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { _error: if (code != TSDB_CODE_SUCCESS) { pSql->res.code = code; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } } diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index c2f2dda1af9f81c46fd4c73fe3646e22de290f4d..16bbd420c0cd77bd3f94ce64d840ccda36a36eb6 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -272,7 +272,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) { if (pRes->code != TSDB_CODE_SUCCESS) { taos_free_result(pSql); free(builder); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -290,7 +290,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) { if (pRes->code == TSDB_CODE_SUCCESS) { (*pParentSql->fp)(pParentSql->param, pParentSql, code); } else { - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); } } } @@ -924,7 +924,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { (*pSql->fp)(pSql->param, pSql, code); } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS){ } else { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } return code; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 5d7a13675fda001c47384b124165d7d2599d7b1f..ec90d21394a5dce657652fb9e6c2ca81dd2f120f 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1429,7 +1429,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { fclose(fp); pParentSql->res.code = code; - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } while (0); } @@ -1500,7 +1500,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { code = doPackSendDataBlock(pSql, count, pTableDataBlock); if (code != TSDB_CODE_SUCCESS) { pParentSql->res.code = code; - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -1535,7 +1535,7 @@ void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) { tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code)); tfree(pSupporter); - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a41ece923a5427a380254bd2dcc3065a69b70000..7d5d71bac3d9a8046f3bda38eae1f64fc4ccb944 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -439,7 +439,7 @@ int doProcessSql(SSqlObj *pSql) { } if (pRes->code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return pRes->code; } @@ -448,7 +448,7 @@ int doProcessSql(SSqlObj *pSql) { // NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads. if (code != TSDB_CODE_SUCCESS) { pRes->code = code; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return code; } @@ -1528,7 +1528,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) { if (code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows); } else { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } } @@ -1557,7 +1557,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { int32_t code = pRes->code; if (pRes->code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return code; } @@ -1576,7 +1576,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { if (pRes->code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); } else { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } return code; @@ -2357,7 +2357,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create } /** - * retrieve table meta from mnode, and update the local table meta cache. + * retrieve table meta from mnode, and update the local table meta hashmap. * @param pSql sql object * @param tableIndex table index * @return status code @@ -2365,16 +2365,18 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); + const char* name = pTableMetaInfo->name; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pTableMetaInfo->pTableMeta) { - tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql, - tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta); + if (pTableMeta) { + tscDebug("%p update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64, pSql, name, + tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid); } - taosHashRemove(tscTableMetaInfo, pTableMetaInfo->name, strnlen(pTableMetaInfo->name, TSDB_TABLE_FNAME_LEN)); + // remove stored tableMeta info in hash table + taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); return getTableMetaFromMnode(pSql, pTableMetaInfo); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index b86d5851bf9879ffa9b74c7a5b9e8e76377ff1d7..377cb24b1d5b1c92c702cd113c4feea185617911 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -709,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { pSubObj->rpcRid = -1; } - tscQueueAsyncRes(pSubObj); + tscAsyncResultOnError(pSubObj); taosReleaseRef(tscObjRef, pSubObj->self); } @@ -745,7 +745,7 @@ void taos_stop_query(TAOS_RES *res) { pSql->rpcRid = -1; } - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8e6dbe29a67689f0f6cf8218495fd57bd5990df0..681291d0db7039af09147d86f6db1fa48ccb233c 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -779,7 +779,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = numOfRows; quitAllSubquery(pParentSql, pSupporter); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -796,7 +796,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); quitAllSubquery(pParentSql, pSupporter); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -845,7 +845,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow if (code != TSDB_CODE_SUCCESS) { freeJoinSubqueryObj(pParentSql); pParentSql->res.code = code; - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); taosArrayDestroy(s1); taosArrayDestroy(s2); @@ -916,7 +916,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow pParentSql->res.code = numOfRows; quitAllSubquery(pParentSql, pSupporter); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -930,7 +930,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -1028,7 +1028,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pParentSql->res.code = numOfRows; tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows)); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -1155,7 +1155,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, 0); } else { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } return; @@ -1233,7 +1233,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, 0); } else { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } return; @@ -1344,7 +1344,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code); quitAllSubquery(pParentSql, pSupporter); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -1357,7 +1357,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pParentSql->res.code = code; quitAllSubquery(pParentSql, pSupporter); - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); return; } @@ -1403,7 +1403,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (pParentSql->res.code == TSDB_CODE_SUCCESS) { (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); } } } @@ -1612,7 +1612,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { _error: pRes->code = code; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { @@ -1666,7 +1666,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize); if (ret != 0) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); tfree(pMemoryBuf); return ret; } @@ -1680,7 +1680,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub); - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return ret; } @@ -1890,7 +1890,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO (*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code); } else { // regular super table query if (pParentSql->res.code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); } } } @@ -1968,7 +1968,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p if (pParentSql->res.code == TSDB_CODE_SUCCESS) { (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { - tscQueueAsyncRes(pParentSql); + tscAsyncResultOnError(pParentSql); } } @@ -2220,7 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { if (!needRetryInsert(pParentObj, numOfSub)) { - tscQueueAsyncRes(pParentObj); + tscAsyncResultOnError(pParentObj); return; } @@ -2265,7 +2265,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) if (code != TSDB_CODE_SUCCESS) { pParentObj->res.code = code; - tscQueueAsyncRes(pParentObj); + tscAsyncResultOnError(pParentObj); return; } @@ -2289,7 +2289,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return code; // here the pSql may have been released already. } @@ -2482,7 +2482,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { SSqlRes* pRes = &pSql->res; if (pRes->code != TSDB_CODE_SUCCESS) { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -2497,7 +2497,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) { pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); return; } @@ -2509,7 +2509,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { if (pRes->code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); } else { - tscQueueAsyncRes(pSql); + tscAsyncResultOnError(pSql); } }