提交 001d28f2 编写于 作者: H Haojun Liao

[TD-225] refactor codes.

上级 62127398
...@@ -56,7 +56,6 @@ typedef struct SLocalReducer { ...@@ -56,7 +56,6 @@ typedef struct SLocalReducer {
tFilePage * pTempBuffer; tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx; struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result. int32_t rowSize; // size of each intermediate result.
int32_t status; // denote it is in reduce process, in reduce process, it
bool hasPrevRow; // cannot be released bool hasPrevRow; // cannot be released
bool hasUnprocessedRow; bool hasUnprocessedRow;
tOrderDescriptor * pDesc; tOrderDescriptor * pDesc;
......
...@@ -493,13 +493,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) { ...@@ -493,13 +493,6 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
// there is no more result, so we release all allocated resource // there is no more result, so we release all allocated resource
SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL); SLocalReducer *pLocalReducer = (SLocalReducer *)atomic_exchange_ptr(&pRes->pLocalReducer, NULL);
if (pLocalReducer != NULL) { if (pLocalReducer != NULL) {
int32_t status = 0;
while ((status = atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY,
TSC_LOCALREDUCE_TOBE_FREED)) == TSC_LOCALREDUCE_IN_PROGRESS) {
taosMsleep(100);
tscDebug("%p waiting for delete procedure, status: %d", pSql, status);
}
pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo); pLocalReducer->pFillInfo = taosDestroyFillInfo(pLocalReducer->pFillInfo);
if (pLocalReducer->pCtx != NULL) { if (pLocalReducer->pCtx != NULL) {
...@@ -1437,24 +1430,13 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1437,24 +1430,13 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
SLocalReducer *pLocalReducer = pRes->pLocalReducer; SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;
// set the data merge in progress
int32_t prevStatus =
atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
return TSDB_CODE_SUCCESS;
}
tFilePage *tmpBuffer = pLocalReducer->pTempBuffer;
if (doHandleLastRemainData(pSql)) { if (doHandleLastRemainData(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (doBuildFilledResultForGroup(pSql)) { if (doBuildFilledResultForGroup(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1510,7 +1492,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1510,7 +1492,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
pLocalReducer->discardData->num = 0; pLocalReducer->discardData->num = 0;
if (saveGroupResultInfo(pSql)) { if (saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1556,7 +1537,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1556,7 +1537,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
// here we do not check the return value // here we do not check the return value
adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree); adjustLoserTreeFromNewData(pLocalReducer, pOneDataSrc, pTree);
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS);
if (pRes->numOfRows == 0) { if (pRes->numOfRows == 0) {
handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer); handleUnprocessedRow(pCmd, pLocalReducer, tmpBuffer);
...@@ -1567,7 +1547,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1567,7 +1547,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
* If previous group is not skipped, keep it in pRes->numOfGroups * If previous group is not skipped, keep it in pRes->numOfGroups
*/ */
if (notSkipped && saveGroupResultInfo(pSql)) { if (notSkipped && saveGroupResultInfo(pSql)) {
pLocalReducer->status = TSC_LOCALREDUCE_READY;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1587,7 +1566,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1587,7 +1566,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
if (pRes->numOfRows == 0) { if (pRes->numOfRows == 0) {
continue; continue;
} else { } else {
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else { // result buffer is not full } else { // result buffer is not full
...@@ -1612,9 +1590,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) { ...@@ -1612,9 +1590,6 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
genFinalResults(pSql, pLocalReducer, true); genFinalResults(pSql, pLocalReducer, true);
} }
assert(pLocalReducer->status == TSC_LOCALREDUCE_IN_PROGRESS && pRes->row == 0);
pLocalReducer->status = TSC_LOCALREDUCE_READY; // set the flag, taos_free_result can release this result.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册