diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index af3399bc2c4a09787af3710f390e702a168bbd84..b9d622a4c38bf7d7e5abd9a9b69e0ba18f820f81 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -153,17 +153,17 @@ typedef struct { SSchemaWrapper* qsw; } SSchemaInfo; -typedef struct { +typedef struct SExchangeOpStopInfo { int32_t operatorType; int64_t refId; } SExchangeOpStopInfo; -typedef struct { +typedef struct STaskStopInfo { SRWLatch lock; SArray* pStopInfo; } STaskStopInfo; -typedef struct SExecTaskInfo { +struct SExecTaskInfo { STaskIdInfo id; uint32_t status; STimeWindow window; @@ -260,7 +260,7 @@ typedef struct SExchangeInfo { // SArray, result block list, used to keep the multi-block that // passed by downstream operator - SArray* pReadyBlocks; + SArray* pResultBlockList; SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. SSDataBlock* pDummyBlock; // dummy block, not keep data bool seqLoadData; // sequential load data or not, false by default diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index cf1f5aa290ab8005975bcfe672765ebaf12b6e25..52aa3db0fdab18e5a8f8bf34de568ced8a6cb2bc 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -182,7 +182,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { } // we have buffered retrieved datablock, return it directly - SSDataBlock** p = taosArrayPop(pExchangeInfo->pReadyBlocks); + SSDataBlock** p = taosArrayPop(pExchangeInfo->pResultBlockList); if (p != NULL) { taosArrayPush(pExchangeInfo->pRecycledBlocks, p); return *p; @@ -193,10 +193,10 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } - if (taosArrayGetSize(pExchangeInfo->pReadyBlocks) == 0) { + if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) { return NULL; } else { - p = taosArrayPop(pExchangeInfo->pReadyBlocks); + p = taosArrayPop(pExchangeInfo->pResultBlockList); taosArrayPush(pExchangeInfo->pRecycledBlocks, p); return *p; } @@ -298,7 +298,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode tsem_init(&pInfo->ready, 0, 0); pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); - pInfo->pReadyBlocks = taosArrayInit(64, POINTER_BYTES); + pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; @@ -346,7 +346,7 @@ void doDestroyExchangeOperatorInfo(void* param) { taosArrayDestroy(pExInfo->pSources); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); - taosArrayDestroyEx(pExInfo->pReadyBlocks, freeBlock); + taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock); taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock); blockDataDestroy(pExInfo->pDummyBlock); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 2ec882d1de7460299f46d78ae828395fc81949e5..307a82e256f694416a3bc386e510613bf8bc313b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -30,6 +30,8 @@ typedef struct SSumRes { double dsum; }; int16_t type; + int64_t prevTs; + bool isPrevTsSet; } SSumRes; typedef struct SMinmaxResInfo {