提交 5d83a353 编写于 作者: H Haojun Liao

[TD-225] refactor codes.

上级 4a939018
...@@ -2172,6 +2172,15 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { ...@@ -2172,6 +2172,15 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) {
return true; return true;
} }
static void doFreeInsertSupporter(SSqlObj* pSqlObj) {
assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0);
for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) {
SSqlObj* pSql = pSqlObj->pSubs[i];
tfree(pSql->param);
}
}
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
SInsertSupporter *pSupporter = (SInsertSupporter *)param; SInsertSupporter *pSupporter = (SInsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql; SSqlObj* pParentObj = pSupporter->pSql;
...@@ -2203,10 +2212,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2203,10 +2212,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
if (pParentObj->res.code == TSDB_CODE_SUCCESS) { if (pParentObj->res.code == TSDB_CODE_SUCCESS) {
tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
for(int32_t i = 0; i < numOfSub; ++i) { doFreeInsertSupporter(pParentObj);
SSqlObj* pSql = pParentObj->pSubs[i];
tfree(pSql->param);
}
// todo remove this parameter in async callback function definition. // todo remove this parameter in async callback function definition.
// all data has been sent to vnode, call user function // all data has been sent to vnode, call user function
...@@ -2214,6 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2214,6 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
(*pParentObj->fp)(pParentObj->param, pParentObj, v); (*pParentObj->fp)(pParentObj->param, pParentObj, v);
} else { } else {
if (!needRetryInsert(pParentObj, numOfSub)) { if (!needRetryInsert(pParentObj, numOfSub)) {
doFreeInsertSupporter(pParentObj);
tscQueueAsyncRes(pParentObj); tscQueueAsyncRes(pParentObj);
return; return;
} }
...@@ -2244,7 +2251,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -2244,7 +2251,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->cmd.parseFinished = false; pParentObj->cmd.parseFinished = false;
pParentObj->subState.numOfRemain = numOfFailed; pParentObj->subState.numOfRemain = numOfFailed;
pParentObj->subState.numOfSub = numOfFailed;
tscResetSqlCmdObj(&pParentObj->cmd, false); tscResetSqlCmdObj(&pParentObj->cmd, false);
......
...@@ -703,10 +703,11 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se ...@@ -703,10 +703,11 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
return forwardStep; return forwardStep;
} }
static UNUSED_FUNC void updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, int32_t* numOfClosed, TSKEY lastKey, bool ascQuery) { static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) {
int32_t i = 0; int32_t i = 0;
int64_t skey = TSKEY_INITIAL_VAL; int64_t skey = TSKEY_INITIAL_VAL;
int32_t numOfClosed = 0;
for (i = 0; i < pWindowResInfo->size; ++i) { for (i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i]; SResultRow *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) { if (pResult->closed) {
...@@ -732,11 +733,11 @@ static UNUSED_FUNC void updateResultRowCurrentIndex(SResultRowInfo* pWindowResIn ...@@ -732,11 +733,11 @@ static UNUSED_FUNC void updateResultRowCurrentIndex(SResultRowInfo* pWindowResIn
} }
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
return numOfClosed;
} }
/** /**
* NOTE: the query status only set for the first scan of master scan. * NOTE: the query status only set for the first scan of master scan.
* TODO refactor
*/ */
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) { static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
...@@ -755,34 +756,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe ...@@ -755,34 +756,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
pWindowResInfo->curIndex = pWindowResInfo->size - 1; pWindowResInfo->curIndex = pWindowResInfo->size - 1;
setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL);
} else { // set the current index to be the last unclosed window } else { // set the current index to be the last unclosed window
int32_t i = 0; numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery);
int64_t skey = TSKEY_INITIAL_VAL;
for (i = 0; i < pWindowResInfo->size; ++i) {
SResultRow *pResult = pWindowResInfo->pResult[i];
if (pResult->closed) {
numOfClosed += 1;
continue;
}
TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
closeTimeWindow(pWindowResInfo, i);
} else {
skey = pResult->win.skey;
break;
}
}
// all windows are closed, set the last one to be the skey
if (skey == TSKEY_INITIAL_VAL) {
assert(i == pWindowResInfo->size);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
pWindowResInfo->curIndex = i;
}
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey;
// the number of completed slots are larger than the threshold, return current generated results to client. // the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pQuery->rec.threshold) { if (numOfClosed > pQuery->rec.threshold) {
...@@ -4501,6 +4475,18 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc ...@@ -4501,6 +4475,18 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
} else { } else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
} }
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
// TODO refactor
if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) {
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
} else {
updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery);
}
}
} }
bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) { bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册