From 3104479733d719f27f5562aa039d4786806c813c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 26 Apr 2021 21:50:16 +0800 Subject: [PATCH] [td-225]1) fix bug found by regression test; 2) refactor code. --- src/client/inc/tsclient.h | 2 +- src/client/src/tscAsync.c | 12 ++++++------ src/client/src/tscLocalMerge.c | 1 + src/client/src/tscParseInsert.c | 2 +- src/client/src/tscPrepare.c | 2 +- src/client/src/tscServer.c | 32 +++++++++++++++++--------------- src/client/src/tscSql.c | 10 +++++----- src/client/src/tscStream.c | 3 ++- src/client/src/tscSub.c | 5 ++++- src/client/src/tscSubquery.c | 28 ++++++++++++++-------------- src/client/src/tscUtil.c | 18 ++++++++++++------ 11 files changed, 64 insertions(+), 51 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 52c5a32941..ec3b0c4421 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -423,7 +423,7 @@ void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool initial); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); -int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo); +int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo); int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex); void tscAsyncResultOnError(SSqlObj *pSql); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a972f194c5..71c960b454 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -180,7 +180,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockForSubquery(pSql); } else { - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } } @@ -256,7 +256,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { } SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd); - tscProcessSql(pSql, pQueryInfo1); + tscBuildAndSendRequest(pSql, pQueryInfo1); } } @@ -396,8 +396,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - // tscProcessSql can add error into async res - tscProcessSql(pSql, NULL); + // tscBuildAndSendRequest can add error into async res + tscBuildAndSendRequest(pSql, NULL); taosReleaseRef(tscObjRef, pSql->self); return; } else { // continue to process normal async query @@ -428,9 +428,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { goto _error; } - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } else { // in all other cases, simple retry - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } taosReleaseRef(tscObjRef, pSql->self); diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 9dc0369dda..f96ced8850 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1051,6 +1051,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { } } + pOperator->status = OP_EXEC_DONE; return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f75c7f036b..68c974359c 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1381,7 +1381,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return code; } - return tscProcessSql(pSql, NULL); + return tscBuildAndSendRequest(pSql, NULL); } typedef struct SImportFileSupport { diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 05fa264179..c3c8986e2f 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -815,7 +815,7 @@ static int insertStmtExecute(STscStmt* stmt) { pRes->numOfRows = 0; pRes->numOfTotal = 0; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); // wait for the callback function to post the semaphore tsem_wait(&pSql->rspSem); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7aba1d4c0c..0a9a109932 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -270,7 +270,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { assert(pHB->self == pObj->hbrid); pHB->retry = 0; - int32_t code = tscProcessSql(pHB, NULL); + int32_t code = tscBuildAndSendRequest(pHB, NULL); taosReleaseRef(tscObjRef, pObj->hbrid); if (code != TSDB_CODE_SUCCESS) { @@ -467,7 +467,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { rpcFreeCont(rpcMsg->pCont); } -int doProcessSql(SSqlObj *pSql) { +int doBuildAndSendMsg(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -499,7 +499,7 @@ int doProcessSql(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) { +int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) { char name[TSDB_TABLE_FNAME_LEN] = {0}; SSqlCmd *pCmd = &pSql->cmd; @@ -533,7 +533,7 @@ int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) { return (*tscProcessMsgRsp[pCmd->command])(pSql); } - return doProcessSql(pSql); + return doBuildAndSendMsg(pSql); } int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { @@ -816,6 +816,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSqlCmd *pCmd = &pSql->cmd; + int32_t code = TSDB_CODE_SUCCESS; int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { @@ -907,16 +908,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } for (int32_t i = 0; i < query.numOfOutput; ++i) { - int32_t code = serializeSqlExpr(&query.pExpr1[i].base, pTableMetaInfo, &pMsg, pSql); + code = serializeSqlExpr(&query.pExpr1[i].base, pTableMetaInfo, &pMsg, pSql); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _end; } } for (int32_t i = 0; i < query.numOfExpr2; ++i) { - int32_t code = serializeSqlExpr(&query.pExpr2[i].base, pTableMetaInfo, &pMsg, pSql); + code = serializeSqlExpr(&query.pExpr2[i].base, pTableMetaInfo, &pMsg, pSql); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _end; } } @@ -925,7 +926,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // serialize the table info (sid, uid, tags) pMsg = doSerializeTableInfo(pQueryMsg, pSql, pTableMetaInfo, pMsg, &succeed); if (succeed == 0) { - return TSDB_CODE_TSC_APP_ERROR; + code = TSDB_CODE_TSC_APP_ERROR; + goto _end; } SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr; @@ -1001,9 +1003,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (pQueryInfo->tsBuf != NULL) { // note: here used the index instead of actual vnode id. int32_t vnodeIndex = pTableMetaInfo->vgroupIndex; - int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks); + code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _end; } pMsg += pQueryMsg->tsBuf.tsLen; @@ -1034,11 +1036,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->head.contLen = htonl(msgLen); assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize); + _end: freeQueryAttr(&query); taosArrayDestroy(tableScanOperator); taosArrayDestroy(queryOperator); - - return TSDB_CODE_SUCCESS; + return code; } int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { @@ -2430,7 +2432,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn pSql->metaRid = pNew->self; - int32_t code = tscProcessSql(pNew, NULL); + int32_t code = tscBuildAndSendRequest(pNew, NULL); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated } @@ -2566,7 +2568,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { pNew->fp = tscTableMetaCallBack; pNew->param = (void *)pSql->self; - code = tscProcessSql(pNew, NULL); + code = tscBuildAndSendRequest(pNew, NULL); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 2c960ecefd..9298d1fd84 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -191,7 +191,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, pSql->fp = syncConnCallback; pSql->param = pSql; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); tsem_wait(&pSql->rspSem); if (pSql->res.code != TSDB_CODE_SUCCESS) { @@ -265,7 +265,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, if (taos) *taos = pObj; pSql->fetchFp = fp; - pSql->res.code = tscProcessSql(pSql, NULL); + pSql->res.code = tscBuildAndSendRequest(pSql, NULL); tscDebug("%p DB async connection is opening", taos); return pObj; } @@ -578,7 +578,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]); - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); return false; } @@ -1048,7 +1048,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { /* * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. - * If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql() + * If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscBuildAndSendRequest() * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. */ pRes->qId = 0; @@ -1061,7 +1061,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { tscDoQuery(pSql); - tscDebug("0x%"PRIx64" load multi table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj); + tscDebug("0x%"PRIx64" load multi-table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj); if ((code = pRes->code) != TSDB_CODE_SUCCESS) { tscFreeSqlObj(pSql); } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 2fc857e4c7..2551662d93 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -113,10 +113,11 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name)); pQueryInfo->command = TSDB_SQL_SELECT; + pSql->cmd.active = pQueryInfo; pSql->fp = tscProcessStreamQueryCallback; pSql->fetchFp = tscProcessStreamQueryCallback; - tscDoQuery(pSql); + executeQuery(pSql, pQueryInfo); tscIncStreamExecutionCount(pStream); } else { setRetryInfo(pStream, code); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 05d3cedf35..a32cadb907 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -557,7 +557,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->fp = asyncCallback; pSql->fetchFp = asyncCallback; pSql->param = pSub; - tscDoQuery(pSql); + + pSql->cmd.active = pQueryInfo; + executeQuery(pSql, pQueryInfo); + tsem_wait(&pSub->sem); if (pRes->code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 79fb407069..23f3c2046d 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -855,7 +855,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) { @@ -1176,7 +1176,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); return; } @@ -1360,7 +1360,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow // set the callback function pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); return; } @@ -1445,7 +1445,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); return; } else { tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self); @@ -1605,7 +1605,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pSub->cmd.command = TSDB_SQL_SELECT; pSub->fp = tscJoinQueryCallback; - tscProcessSql(pSub, NULL); + tscBuildAndSendRequest(pSub, NULL); tryNextVnode = true; } else { tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self); @@ -1675,7 +1675,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) { pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - tscProcessSql(pSql1, NULL); + tscBuildAndSendRequest(pSql1, NULL); } } } @@ -1775,7 +1775,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { pSql->fp = tidTagRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); return; } @@ -1783,7 +1783,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { pSql->fp = tsCompRetrieveCallback; pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); return; } @@ -1804,7 +1804,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } else { // first retrieve from vnode during the secondary stage sub-query // set the command flag must be after the semaphore been correctly set. if (pParentSql->res.code == TSDB_CODE_SUCCESS) { @@ -2021,7 +2021,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) { continue; } - if ((code = tscProcessSql(pSub, NULL)) != TSDB_CODE_SUCCESS) { + if ((code = tscBuildAndSendRequest(pSub, NULL)) != TSDB_CODE_SUCCESS) { pRes->code = code; (*pSub->fp)(pSub->param, pSub, 0); fail = 1; @@ -2531,7 +2531,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SRetrieveSupport* pSupport = pSub->param; tscDebug("0x%"PRIx64" sub:%p launch subquery, orderOfSub:%d.", pSql->self, pSub, pSupport->subqueryIndex); - tscProcessSql(pSub, NULL); + tscBuildAndSendRequest(pSub, NULL); } return TSDB_CODE_SUCCESS; @@ -2611,7 +2611,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 return pParentSql->res.code; } - int32_t ret = tscProcessSql(pNew, NULL); + int32_t ret = tscBuildAndSendRequest(pNew, NULL); *sent = 1; @@ -3123,7 +3123,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { return code; // here the pSql may have been released already. } - return tscProcessSql(pSql, NULL); + return tscBuildAndSendRequest(pSql, NULL); } int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { @@ -3222,7 +3222,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { for (int32_t j = 0; j < numOfSub; ++j) { SSqlObj *pSub = pSql->pSubs[j]; tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j); - tscProcessSql(pSub, NULL); + tscBuildAndSendRequest(pSub, NULL); } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 86640dfbcd..e35fba9143 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2777,7 +2777,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } else if (pSql->cmd.command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); } else { // send request to server directly - tscProcessSql(pSql, pQueryInfo); + tscBuildAndSendRequest(pSql, pQueryInfo); } } @@ -2788,6 +2788,10 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { return; } + if (pSql->cmd.command == TSDB_SQL_SELECT) { + tscAddIntoSqlList(pSql); + } + if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0); @@ -2805,6 +2809,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { } /** + * todo remove it * To decide if current is a two-stage super table query, join query, or insert. And invoke different * procedure accordingly * @param pSql @@ -2835,14 +2840,14 @@ void tscDoQuery(SSqlObj* pSql) { tscHandleMasterJoinQuery(pSql); } else { // for first stage sub query, iterate all vnodes to get all timestamp if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } else { // secondary stage join query. if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query tscLockByThread(&pSql->squeryLock); tscHandleMasterSTableQuery(pSql); tscUnlockByThread(&pSql->squeryLock); } else { - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } } } @@ -2857,8 +2862,9 @@ void tscDoQuery(SSqlObj* pSql) { tscUnlockByThread(&pSql->squeryLock); return; } - - tscProcessSql(pSql, NULL); + + pCmd->active = pQueryInfo; + tscBuildAndSendRequest(pSql, NULL); } } @@ -3074,7 +3080,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { // set the callback function pSql->fp = fp; - tscProcessSql(pSql, NULL); + tscBuildAndSendRequest(pSql, NULL); } else { tscDebug("0x%"PRIx64" try all %d vnodes, query complete. current numOfRes:%" PRId64, pSql->self, totalVgroups, pRes->numOfClauseTotal); } -- GitLab