提交 945e3466 编写于 作者: H Haojun Liao

[TD-225]refactor codes.

上级 897b0264
......@@ -402,7 +402,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql);
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscQueueAsyncRes(SSqlObj *pSql);
void tscAsyncResultOnError(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
......
......@@ -56,7 +56,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -70,7 +70,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -165,7 +165,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes->code = numOfRows;
}
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -216,7 +216,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
pSql->param = param;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -279,7 +279,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
pSql->param = param;
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -381,7 +381,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
}
void tscQueueAsyncRes(SSqlObj *pSql) {
void tscAsyncResultOnError(SSqlObj *pSql) {
if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
return;
......@@ -531,6 +531,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......@@ -272,7 +272,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) {
if (pRes->code != TSDB_CODE_SUCCESS) {
taos_free_result(pSql);
free(builder);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -290,7 +290,7 @@ void tscSCreateCallBack(void *param, TAOS_RES *tres, int code) {
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, code);
} else {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
}
......@@ -924,7 +924,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
(*pSql->fp)(pSql->param, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS){
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return code;
}
......@@ -1429,7 +1429,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
fclose(fp);
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
} while (0);
}
......@@ -1500,7 +1500,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
code = doPackSendDataBlock(pSql, count, pTableDataBlock);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1535,7 +1535,7 @@ void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql) {
tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
tfree(pSupporter);
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......
......@@ -439,7 +439,7 @@ int doProcessSql(SSqlObj *pSql) {
}
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return pRes->code;
}
......@@ -448,7 +448,7 @@ int doProcessSql(SSqlObj *pSql) {
// NOTE: if code is TSDB_CODE_SUCCESS, pSql may have been released here already by other threads.
if (code != TSDB_CODE_SUCCESS) {
pRes->code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return code;
}
......@@ -1528,7 +1528,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
if (code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pSql->res.numOfRows);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......@@ -1557,7 +1557,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
int32_t code = pRes->code;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return code;
}
......@@ -1576,7 +1576,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return code;
......@@ -2357,7 +2357,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
}
/**
* retrieve table meta from mnode, and update the local table meta cache.
* retrieve table meta from mnode, and update the local table meta hashmap.
* @param pSql sql object
* @param tableIndex table index
* @return status code
......@@ -2365,16 +2365,18 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
const char* name = pTableMetaInfo->name;
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) {
tscDebug("%p update table meta, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64 ", addr:%p", pSql,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid, pTableMeta);
if (pTableMeta) {
tscDebug("%p update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRId64, pSql, name,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid);
}
taosHashRemove(tscTableMetaInfo, pTableMetaInfo->name, strnlen(pTableMetaInfo->name, TSDB_TABLE_FNAME_LEN));
// remove stored tableMeta info in hash table
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
return getTableMetaFromMnode(pSql, pTableMetaInfo);
}
......
......@@ -709,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
pSubObj->rpcRid = -1;
}
tscQueueAsyncRes(pSubObj);
tscAsyncResultOnError(pSubObj);
taosReleaseRef(tscObjRef, pSubObj->self);
}
......@@ -745,7 +745,7 @@ void taos_stop_query(TAOS_RES *res) {
pSql->rpcRid = -1;
}
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......
......@@ -779,7 +779,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -796,7 +796,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -845,7 +845,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (code != TSDB_CODE_SUCCESS) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = code;
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
taosArrayDestroy(s1);
taosArrayDestroy(s2);
......@@ -916,7 +916,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -930,7 +930,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1028,7 +1028,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pParentSql->res.code = numOfRows;
tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1155,7 +1155,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return;
......@@ -1233,7 +1233,7 @@ void tscFetchDatablockForSubquery(SSqlObj* pSql) {
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
return;
......@@ -1344,7 +1344,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1357,7 +1357,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
return;
}
......@@ -1403,7 +1403,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
}
......@@ -1612,7 +1612,7 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
_error:
pRes->code = code;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) {
......@@ -1666,7 +1666,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, &pFinalModel, nBufferSize);
if (ret != 0) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
tfree(pMemoryBuf);
return ret;
}
......@@ -1680,7 +1680,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, pFinalModel,pState->numOfSub);
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return ret;
}
......@@ -1890,7 +1890,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
}
......@@ -1968,7 +1968,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
tscAsyncResultOnError(pParentSql);
}
}
......@@ -2220,7 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
(*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else {
if (!needRetryInsert(pParentObj, numOfSub)) {
tscQueueAsyncRes(pParentObj);
tscAsyncResultOnError(pParentObj);
return;
}
......@@ -2265,7 +2265,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
if (code != TSDB_CODE_SUCCESS) {
pParentObj->res.code = code;
tscQueueAsyncRes(pParentObj);
tscAsyncResultOnError(pParentObj);
return;
}
......@@ -2289,7 +2289,7 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) {
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return code; // here the pSql may have been released already.
}
......@@ -2482,7 +2482,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -2497,7 +2497,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL || pRes->buffer == NULL || pRes->length == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
return;
}
......@@ -2509,7 +2509,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
tscAsyncResultOnError(pSql);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册