提交 31044797 编写于 作者: H Haojun Liao

[td-225]1) fix bug found by regression test; 2) refactor code.

上级 47222b65
......@@ -423,7 +423,7 @@ void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo);
int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo);
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscAsyncResultOnError(SSqlObj *pSql);
......
......@@ -180,7 +180,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else {
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
}
}
......@@ -256,7 +256,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
}
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd);
tscProcessSql(pSql, pQueryInfo1);
tscBuildAndSendRequest(pSql, pQueryInfo1);
}
}
......@@ -396,8 +396,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
// tscProcessSql can add error into async res
tscProcessSql(pSql, NULL);
// tscBuildAndSendRequest can add error into async res
tscBuildAndSendRequest(pSql, NULL);
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
......@@ -428,9 +428,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
} else { // in all other cases, simple retry
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
......
......@@ -1051,6 +1051,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
}
}
pOperator->status = OP_EXEC_DONE;
return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL;
}
......
......@@ -1381,7 +1381,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
return tscProcessSql(pSql, NULL);
return tscBuildAndSendRequest(pSql, NULL);
}
typedef struct SImportFileSupport {
......
......@@ -815,7 +815,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
......
......@@ -270,7 +270,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(pHB->self == pObj->hbrid);
pHB->retry = 0;
int32_t code = tscProcessSql(pHB, NULL);
int32_t code = tscBuildAndSendRequest(pHB, NULL);
taosReleaseRef(tscObjRef, pObj->hbrid);
if (code != TSDB_CODE_SUCCESS) {
......@@ -467,7 +467,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
rpcFreeCont(rpcMsg->pCont);
}
int doProcessSql(SSqlObj *pSql) {
int doBuildAndSendMsg(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -499,7 +499,7 @@ int doProcessSql(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
SSqlCmd *pCmd = &pSql->cmd;
......@@ -533,7 +533,7 @@ int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
return (*tscProcessMsgRsp[pCmd->command])(pSql);
}
return doProcessSql(pSql);
return doBuildAndSendMsg(pSql);
}
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
......@@ -816,6 +816,7 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo,
int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
int32_t size = tscEstimateQueryMsgSize(pSql, pCmd->clauseIndex);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
......@@ -907,16 +908,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
for (int32_t i = 0; i < query.numOfOutput; ++i) {
int32_t code = serializeSqlExpr(&query.pExpr1[i].base, pTableMetaInfo, &pMsg, pSql);
code = serializeSqlExpr(&query.pExpr1[i].base, pTableMetaInfo, &pMsg, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
}
for (int32_t i = 0; i < query.numOfExpr2; ++i) {
int32_t code = serializeSqlExpr(&query.pExpr2[i].base, pTableMetaInfo, &pMsg, pSql);
code = serializeSqlExpr(&query.pExpr2[i].base, pTableMetaInfo, &pMsg, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
}
......@@ -925,7 +926,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// serialize the table info (sid, uid, tags)
pMsg = doSerializeTableInfo(pQueryMsg, pSql, pTableMetaInfo, pMsg, &succeed);
if (succeed == 0) {
return TSDB_CODE_TSC_APP_ERROR;
code = TSDB_CODE_TSC_APP_ERROR;
goto _end;
}
SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
......@@ -1001,9 +1003,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pQueryInfo->tsBuf != NULL) {
// note: here used the index instead of actual vnode id.
int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks);
code = dumpFileBlockByGroupId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsBuf.tsLen, &pQueryMsg->tsBuf.tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _end;
}
pMsg += pQueryMsg->tsBuf.tsLen;
......@@ -1034,11 +1036,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->head.contLen = htonl(msgLen);
assert(msgLen + minMsgSize() <= (int32_t)pCmd->allocSize);
_end:
freeQueryAttr(&query);
taosArrayDestroy(tableScanOperator);
taosArrayDestroy(queryOperator);
return TSDB_CODE_SUCCESS;
return code;
}
int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
......@@ -2430,7 +2432,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
pSql->metaRid = pNew->self;
int32_t code = tscProcessSql(pNew, NULL);
int32_t code = tscBuildAndSendRequest(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated
}
......@@ -2566,7 +2568,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew->fp = tscTableMetaCallBack;
pNew->param = (void *)pSql->self;
code = tscProcessSql(pNew, NULL);
code = tscBuildAndSendRequest(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
......
......@@ -191,7 +191,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pSql->fp = syncConnCallback;
pSql->param = pSql;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
tsem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -265,7 +265,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
if (taos) *taos = pObj;
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql, NULL);
pSql->res.code = tscBuildAndSendRequest(pSql, NULL);
tscDebug("%p DB async connection is opening", taos);
return pObj;
}
......@@ -578,7 +578,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]);
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
return false;
}
......@@ -1048,7 +1048,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql()
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscBuildAndSendRequest()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qId = 0;
......@@ -1061,7 +1061,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tscDoQuery(pSql);
tscDebug("0x%"PRIx64" load multi table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
tscDebug("0x%"PRIx64" load multi-table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
}
......
......@@ -113,10 +113,11 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name));
pQueryInfo->command = TSDB_SQL_SELECT;
pSql->cmd.active = pQueryInfo;
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
tscDoQuery(pSql);
executeQuery(pSql, pQueryInfo);
tscIncStreamExecutionCount(pStream);
} else {
setRetryInfo(pStream, code);
......
......@@ -557,7 +557,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->fp = asyncCallback;
pSql->fetchFp = asyncCallback;
pSql->param = pSub;
tscDoQuery(pSql);
pSql->cmd.active = pQueryInfo;
executeQuery(pSql, pQueryInfo);
tsem_wait(&pSub->sem);
if (pRes->code != TSDB_CODE_SUCCESS) {
......
......@@ -855,7 +855,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
}
static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
......@@ -1176,7 +1176,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
return;
}
......@@ -1360,7 +1360,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
return;
}
......@@ -1445,7 +1445,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
return;
} else {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSql->self);
......@@ -1605,7 +1605,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
pSub->cmd.command = TSDB_SQL_SELECT;
pSub->fp = tscJoinQueryCallback;
tscProcessSql(pSub, NULL);
tscBuildAndSendRequest(pSub, NULL);
tryNextVnode = true;
} else {
tscDebug("0x%"PRIx64" no result in current subquery anymore", pSub->self);
......@@ -1675,7 +1675,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql1, NULL);
tscBuildAndSendRequest(pSql1, NULL);
}
}
}
......@@ -1775,7 +1775,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
pSql->fp = tidTagRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
return;
}
......@@ -1783,7 +1783,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
pSql->fp = tsCompRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
return;
}
......@@ -1804,7 +1804,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
} else { // first retrieve from vnode during the secondary stage sub-query
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
......@@ -2021,7 +2021,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
continue;
}
if ((code = tscProcessSql(pSub, NULL)) != TSDB_CODE_SUCCESS) {
if ((code = tscBuildAndSendRequest(pSub, NULL)) != TSDB_CODE_SUCCESS) {
pRes->code = code;
(*pSub->fp)(pSub->param, pSub, 0);
fail = 1;
......@@ -2531,7 +2531,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
SRetrieveSupport* pSupport = pSub->param;
tscDebug("0x%"PRIx64" sub:%p launch subquery, orderOfSub:%d.", pSql->self, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub, NULL);
tscBuildAndSendRequest(pSub, NULL);
}
return TSDB_CODE_SUCCESS;
......@@ -2611,7 +2611,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
return pParentSql->res.code;
}
int32_t ret = tscProcessSql(pNew, NULL);
int32_t ret = tscBuildAndSendRequest(pNew, NULL);
*sent = 1;
......@@ -3123,7 +3123,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
return code; // here the pSql may have been released already.
}
return tscProcessSql(pSql, NULL);
return tscBuildAndSendRequest(pSql, NULL);
}
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
......@@ -3222,7 +3222,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
for (int32_t j = 0; j < numOfSub; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
tscDebug("0x%"PRIx64" sub:%p launch sub insert, orderOfSub:%d", pSql->self, pSub, j);
tscProcessSql(pSub, NULL);
tscBuildAndSendRequest(pSub, NULL);
}
return TSDB_CODE_SUCCESS;
......
......@@ -2777,7 +2777,7 @@ void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
} else if (pSql->cmd.command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else { // send request to server directly
tscProcessSql(pSql, pQueryInfo);
tscBuildAndSendRequest(pSql, pQueryInfo);
}
}
......@@ -2788,6 +2788,10 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
return;
}
if (pSql->cmd.command == TSDB_SQL_SELECT) {
tscAddIntoSqlList(pSql);
}
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0);
......@@ -2805,6 +2809,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
}
/**
* todo remove it
* To decide if current is a two-stage super table query, join query, or insert. And invoke different
* procedure accordingly
* @param pSql
......@@ -2835,14 +2840,14 @@ void tscDoQuery(SSqlObj* pSql) {
tscHandleMasterJoinQuery(pSql);
} else { // for first stage sub query, iterate all vnodes to get all timestamp
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
} else { // secondary stage join query.
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock);
} else {
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
}
}
}
......@@ -2858,7 +2863,8 @@ void tscDoQuery(SSqlObj* pSql) {
return;
}
tscProcessSql(pSql, NULL);
pCmd->active = pQueryInfo;
tscBuildAndSendRequest(pSql, NULL);
}
}
......@@ -3074,7 +3080,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
// set the callback function
pSql->fp = fp;
tscProcessSql(pSql, NULL);
tscBuildAndSendRequest(pSql, NULL);
} else {
tscDebug("0x%"PRIx64" try all %d vnodes, query complete. current numOfRes:%" PRId64, pSql->self, totalVgroups, pRes->numOfClauseTotal);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册