提交 25ee3b48 编写于 作者: H Haojun Liao

[td-225] refactor client code

上级 6a88d306
......@@ -72,17 +72,10 @@ typedef struct SLocalReducer {
bool orderPrjOnSTable; // projection query on stable
} SLocalReducer;
typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfTotal; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel * pFinalColModel; // colModel for final result
SSubqueryState * pState;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
......
......@@ -66,7 +66,6 @@ typedef struct STidTags {
#pragma pack(pop)
typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
SInterval interval;
......
......@@ -334,6 +334,12 @@ typedef struct STscObj {
T_REF_DECLARE()
} STscObj;
typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
typedef struct SSqlObj {
void *signature;
pthread_t owner; // owner of sql object, by which it is executed
......@@ -355,10 +361,11 @@ typedef struct SSqlObj {
tsem_t rspSem;
SSqlCmd cmd;
SSqlRes res;
uint16_t numOfSubs;
SSubqueryState subState;
struct SSqlObj **pSubs;
struct SSqlObj * prev, *next;
struct SSqlObj *prev, *next;
struct SSqlObj **self;
} SSqlObj;
......
......@@ -361,15 +361,6 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
(*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}
void tscProcessAsyncRes(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
SSqlRes *pRes = &pSql->res;
assert(pSql->fp != NULL && pSql->fetchFp != NULL);
pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code);
}
// this function will be executed by queue task threads, so the terrno is not valid
static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle;
......@@ -393,22 +384,15 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
return;
} else {
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
}
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessAsyncRes;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
void tscProcessAsyncFree(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
tscDebug("%p sql is freed", pSql);
taos_free_result(pSql);
SSqlRes *pRes = &pSql->res;
assert(pSql->fp != NULL && pSql->fetchFp != NULL);
pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code);
}
int tscSendMsgToServer(SSqlObj *pSql);
......
......@@ -639,7 +639,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->numOfSubs);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub);
if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -1603,8 +1603,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
return TSDB_CODE_SUCCESS;
}
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, char* aliasName,
int32_t resColIdx, SColumnIndex* pColIndex) {
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc,
char* aliasName, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types";
int16_t type = 0;
......@@ -1650,8 +1650,13 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscColumnListInsert(pQueryInfo->colList, &index);
// if it is not in the final result, do not add it
SColumnList ids = getColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr);
if (finalResult) {
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, columnName, pExpr);
} else {
tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]));
}
return TSDB_CODE_SUCCESS;
}
......@@ -1926,7 +1931,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) {
index.columnIndex = j;
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index) != 0) {
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex++, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
......@@ -1943,7 +1948,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index) != 0) {
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex + i, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -1980,7 +1986,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) {
SColumnIndex index = {.tableIndex = j, .columnIndex = i};
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index) != 0) {
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem->aliasName, colIndex, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
......
......@@ -24,7 +24,6 @@
#include "tschemautil.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tutil.h"
#include "tlockfree.h"
SRpcCorEpSet tscMgmtEpSet;
......@@ -478,20 +477,29 @@ void tscKillSTableQuery(SSqlObj *pSql) {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
for (int i = 0; i < pSql->numOfSubs; ++i) {
for (int i = 0; i < pSql->subState.numOfSub; ++i) {
// NOTE: pSub may have been released already here
SSqlObj *pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
pSub->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSub->pRpcCtx != NULL) {
rpcCancelRequest(pSub->pRpcCtx);
pSub->pRpcCtx = NULL;
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
if (p == NULL) {
continue;
}
SSqlObj* pSubObj = (SSqlObj*) (*p);
assert(pSubObj->self == (SSqlObj**) p);
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
if (pSubObj->pRpcCtx != NULL) {
rpcCancelRequest(pSubObj->pRpcCtx);
pSubObj->pRpcCtx = NULL;
}
tscQueueAsyncRes(pSub); // async res? not other functions?
tscQueueAsyncRes(pSubObj); // async res? not other functions?
taosCacheRelease(tscObjCache, (void**) &p, false);
}
tscDebug("%p super table query cancelled", pSql);
......@@ -1455,7 +1463,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlCmd* pCmd = &pSql->cmd;
int32_t code = pRes->code;
if (pRes->code != TSDB_CODE_SUCCESS) {
......
......@@ -523,7 +523,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pRes->numOfClauseTotal = 0;
pRes->rspType = 0;
pSql->numOfSubs = 0;
pSql->subState.numOfSub = 0;
taosTFree(pSql->pSubs);
assert(pSql->fp == NULL);
......@@ -559,7 +559,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
static bool tscKillQueryInDnode(SSqlObj* pSql) {
static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -568,10 +568,18 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
if (pQueryInfo == NULL) {
return true;
}
if (pSql->pRpcCtx != NULL) {
rpcCancelRequest(pSql->pRpcCtx);
pSql->pRpcCtx = NULL;
return true;
} else {
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscRemoveFromSqlList(pSql);
......@@ -700,6 +708,7 @@ void taos_stop_query(TAOS_RES *res) {
} else { // TODO multithreads bug
if (pSql->cmd.command < TSDB_SQL_LOCAL && pSql->pRpcCtx != NULL) {
rpcCancelRequest(pSql->pRpcCtx);
pSql->pRpcCtx = NULL;
}
}
......
......@@ -274,7 +274,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), false);
tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs);
pSql->numOfSubs = 0;
pSql->subState.numOfSub = 0;
taosTFree(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql);
}
......
此差异已折叠。
......@@ -360,26 +360,26 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs);
pSql->numOfSubs = 0;
pSql->subState.numOfSub = 0;
pSql->self = 0;
tscResetSqlCmdObj(pCmd, false);
}
static UNUSED_FUNC void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->numOfSubs == 0) {
static void tscFreeSubobj(SSqlObj* pSql) {
if (pSql->subState.numOfSub == 0) {
return;
}
tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs);
tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->subState.numOfSub);
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i);
taos_free_result(pSql->pSubs[i]);
pSql->pSubs[i] = NULL;
}
pSql->numOfSubs = 0;
pSql->subState.numOfSub = 0;
}
/**
......@@ -415,7 +415,9 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscDebug("%p start to free sqlObj", pSql);
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
tscFreeSubobj(pSql);
tscPartiallyFreeSqlObj(pSql);
pSql->signature = NULL;
......@@ -2284,7 +2286,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql->numOfSubs = 0;
pSql->subState.numOfSub = 0;
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(pRes);
......@@ -2316,7 +2318,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
pRes->numOfTotal = num;
taosTFree(pSql->pSubs);
pSql->numOfSubs = 0;
pSql->subState.numOfSub = 0;
pSql->fp = fp;
tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
......
......@@ -261,6 +261,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return code;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册