diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h
index 367e656f064a200ace8a051a88a554808f7dae61..706df11cf94898053f90b9d6a45d7896c2ec1098 100644
--- a/source/client/inc/clientInt.h
+++ b/source/client/inc/clientInt.h
@@ -224,12 +224,12 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
- bool stableQuery; // todo refactor
- bool validateOnly; // todo refactor
-
- bool killed;
- uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
- uint32_t retry;
+ bool syncQuery; // todo refactor: async query object
+ bool stableQuery; // todo refactor
+ bool validateOnly; // todo refactor
+ bool killed;
+ uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
+ uint32_t retry;
} SRequestObj;
typedef struct SSyncQueryParam {
diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c
index 207ac01a2c46eb50123ce6e108c617d74560de74..9e67dc6571aae2401bda0a18f348a602dad1790d 100644
--- a/source/client/src/clientEnv.c
+++ b/source/client/src/clientEnv.c
@@ -320,6 +320,10 @@ void doDestroyRequest(void *p) {
deregisterRequest(pRequest);
}
+ if (pRequest->syncQuery) {
+ taosMemoryFree(pRequest->body.param);
+ }
+
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
}
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 2a9d113108a565694091de4c3ea0c78431c50b57..d7c2c26d231c16ecef385127d9ff9a349420d18e 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -307,7 +307,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosThreadMutexUnlock(&appInfo.mutex);
tFreeClientHbBatchRsp(&pRsp);
-
+ taosMemoryFree(pMsg->pData);
return code;
}
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index d846cb93af6a2c16fd21d2f954b8393e426aa27a..783f5450fbde13761e94d34d41c96923551f2219 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -2047,6 +2047,7 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
void syncQueryFn(void* param, void* res, int32_t code) {
SSyncQueryParam* pParam = param;
pParam->pRequest = res;
+
if (pParam->pRequest) {
pParam->pRequest->code = code;
}
@@ -2093,6 +2094,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(¶m->sem);
+
+ param->pRequest->syncQuery = true;
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index d5d52d41ac7bd3827af06419a4bd88823dd3c119..84d503b972bde790ad3a65cdcf3e592a8e970071 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -858,8 +858,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
-SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
- SExecTaskInfo* pTaskInfo);
+SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId,
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 95b4fdcd6e626ea66421beb2c543db8b784e8c4c..76d853ef3ecd784aa5c09b5b7b5da290d38fe48c 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include
+#include "ttime.h"
#include "function.h"
#include "functionMgt.h"
#include "index.h"
@@ -603,13 +603,15 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
}
for (int32_t i = 0; i < numOfOutput; ++i) {
- if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0 ||
- strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_group_key") == 0) {
+ const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
+ if ((strcmp(pName, "_select_value") == 0) ||
+ (strcmp(pName, "_group_key") == 0)) {
pValCtx[num++] = &pCtx[i];
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
p = &pCtx[i];
}
}
+
#ifdef BUF_PAGE_DEBUG
qDebug("page_setSelect num:%d", num);
#endif
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 266567ec1bac274a33e0cfb87faf5f86c0b94abc..4e010cfdf11ef9940cd5f005cb06b73ffa48a736 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -3330,7 +3330,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return (rows > 0) ? pInfo->pRes : NULL;
}
-static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
+static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
@@ -3341,25 +3341,26 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
- doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity);
+ int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
+ taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
+
pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId;
pInfo->existNewGroupBlock = NULL;
-// *newgroup = true;
}
-static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup,
+static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo,
SExecTaskInfo* pTaskInfo) {
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
-// *newgroup = false;
- doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity);
- if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) {
+ int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows;
+ taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows);
+ if (pInfo->pRes->info.rows > pResultInfo->threshold) {
return;
}
}
// handle the cached new group data block
if (pInfo->existNewGroupBlock) {
- doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
+ doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
}
}
@@ -3372,8 +3373,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock);
- doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
- if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
+ doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
+ if (pResBlock->info.rows > pResultInfo->threshold || pResBlock->info.rows > 0) {
return pResBlock;
}
@@ -3407,7 +3408,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
}
blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity);
- doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity);
+
+ int32_t numOfResultRows = pOperator->resultInfo.capacity - pBlock->info.rows;
+ taosFillResultDataBlock(pInfo->pFillInfo, pBlock, numOfResultRows);
// current group has no more result to return
if (pResBlock->info.rows > 0) {
@@ -3417,13 +3420,13 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
return pResBlock;
}
- doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo);
+ doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
return pResBlock;
}
} else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL);
- doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, NULL, pTaskInfo);
+ doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > pResultInfo->threshold) {
return pResBlock;
}
@@ -4032,8 +4035,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
}
}
-SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, bool multigroupResult,
- SExecTaskInfo* pTaskInfo) {
+SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo) {
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@@ -4065,7 +4067,6 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
}
pInfo->pRes = pResBlock;
- pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions;
pInfo->pColMatchColInfo = pColMatchColInfo;
pOperator->name = "FillOperator";
@@ -4438,21 +4439,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL;
// }
- int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo);
+ int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
- if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) {
- code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList);
+ if (pScanNode->tableType == TSDB_SUPER_TABLE) {
+ code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = terrno;
return NULL;
}
} else { // Create one table group.
- STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0};
+ STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->uid, .groupId = 0};
taosArrayPush(pTableListInfo->pTableList, &info);
}
@@ -4605,7 +4606,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {
pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) {
- pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, false, pTaskInfo);
+ pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) {
pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) {
diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c
index 1b893739bd846f8e94911c03bbd562d13d6a0370..d195c22c373239c372e0d4cd92b4ba36785e9393 100644
--- a/source/libs/scheduler/src/schRemote.c
+++ b/source/libs/scheduler/src/schRemote.c
@@ -444,14 +444,12 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
trans.pHandle = pMsg->handle;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
-
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
_return:
-
tFreeSSchedulerHbRsp(&rsp);
taosMemoryFree(param);
-
+ taosMemoryFree(pMsg->pData);
SCH_RET(code);
}