提交 09662138 编写于 作者: H Haojun Liao

fix(arch): set the correct module api dependency.

上级 da9f33a4
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "index.h" #include "index.h"
#include "function.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -404,41 +405,9 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); ...@@ -404,41 +405,9 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/ */
} SStoreMeta; } SStoreMeta;
typedef struct STdbState {
void* rocksdb;
void** pHandle;
void* writeOpts;
void* readOpts;
void** cfOpts;
void* dbOpt;
struct SStreamTask* pOwner;
void* param;
void* env;
SListNode* pComparNode;
void* pBackendHandle;
char idstr[64];
void* compactFactory;
void* db;
void* pStateDb;
void* pFuncStateDb;
void* pFillStateDb; // todo refactor
void* pSessionStateDb;
void* pParNameDb;
void* pParTagDb;
void* txn;
} STdbState;
// incremental state storage
typedef struct {
STdbState* pTdbState;
struct SStreamFileState* pFileState;
int32_t number;
SSHashObj* parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
} SStreamState;
typedef struct SUpdateInfo { typedef struct SUpdateInfo {
SArray *pTsBuckets; SArray *pTsBuckets;
...@@ -546,6 +515,7 @@ typedef struct SStorageAPI { ...@@ -546,6 +515,7 @@ typedef struct SStorageAPI {
SStoreTqReader tqReaderFn; SStoreTqReader tqReaderFn;
SStateStore stateStore; SStateStore stateStore;
SMetaDataFilterAPI metaFilter; SMetaDataFilterAPI metaFilter;
SFunctionStateStore functionStore;
} SStorageAPI; } SStorageAPI;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "tcommon.h" #include "tcommon.h"
#include "tvariant.h" #include "tvariant.h"
#include "tsimplehash.h"
struct SqlFunctionCtx; struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
...@@ -127,18 +128,59 @@ typedef struct SSerializeDataHandle { ...@@ -127,18 +128,59 @@ typedef struct SSerializeDataHandle {
void *pState; void *pState;
} SSerializeDataHandle; } SSerializeDataHandle;
// incremental state storage
typedef struct STdbState {
void* rocksdb;
void** pHandle;
void* writeOpts;
void* readOpts;
void** cfOpts;
void* dbOpt;
struct SStreamTask* pOwner;
void* param;
void* env;
SListNode* pComparNode;
void* pBackendHandle;
char idstr[64];
void* compactFactory;
void* db;
void* pStateDb;
void* pFuncStateDb;
void* pFillStateDb; // todo refactor
void* pSessionStateDb;
void* pParNameDb;
void* pParTagDb;
void* txn;
} STdbState;
typedef struct {
STdbState* pTdbState;
struct SStreamFileState* pFileState;
int32_t number;
SSHashObj* parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
} SStreamState;
typedef struct SFunctionStateStore {
int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
} SFunctionStateStore;
// sql function runtime context // sql function runtime context
typedef struct SqlFunctionCtx { typedef struct SqlFunctionCtx {
SInputColumnInfoData input; SInputColumnInfoData input;
SResultDataInfo resDataInfo; SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc uint32_t order; // data block scanner order: asc|desc
uint8_t isPseudoFunc;// denote current function is pseudo function or not [added for perf reason] uint8_t isPseudoFunc; // denote current function is pseudo function or not [added for perf reason]
uint8_t isNotNullFunc;// not return null value. uint8_t isNotNullFunc; // not return null value.
uint8_t scanFlag; // record current running step, default: 0 uint8_t scanFlag; // record current running step, default: 0
int16_t functionId; // function id int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data char *pOutput; // final result output buffer, point to sdata->data
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param // input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param; SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum // corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput; SColumnInfoData *pTsOutput;
int32_t numOfParams; int32_t numOfParams;
...@@ -155,6 +197,7 @@ typedef struct SqlFunctionCtx { ...@@ -155,6 +197,7 @@ typedef struct SqlFunctionCtx {
SSerializeDataHandle saveHandle; SSerializeDataHandle saveHandle;
int32_t exprIdx; int32_t exprIdx;
char *udfName; char *udfName;
SFunctionStateStore *pStore;
} SqlFunctionCtx; } SqlFunctionCtx;
typedef struct tExprNode { typedef struct tExprNode {
......
...@@ -66,16 +66,6 @@ int32_t streamStateCommit(SStreamState* pState); ...@@ -66,16 +66,6 @@ int32_t streamStateCommit(SStreamState* pState);
void streamStateDestroy(SStreamState* pState, bool remove); void streamStateDestroy(SStreamState* pState, bool remove);
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
//typedef struct {
// rocksdb_iterator_t* iter;
// rocksdb_snapshot_t* snapshot;
// rocksdb_readoptions_t* readOpt;
// rocksdb_t* db;
//
// TBC* pCur;
// int64_t number;
//} SStreamStateCur;
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
......
...@@ -24,6 +24,7 @@ static void initTqAPI(SStoreTqReader* pTq); ...@@ -24,6 +24,7 @@ static void initTqAPI(SStoreTqReader* pTq);
static void initStateStoreAPI(SStateStore* pStore); static void initStateStoreAPI(SStateStore* pStore);
static void initMetaReaderAPI(SStoreMetaReader* pMetaReader); static void initMetaReaderAPI(SStoreMetaReader* pMetaReader);
static void initMetaFilterAPI(SMetaDataFilterAPI* pFilter); static void initMetaFilterAPI(SMetaDataFilterAPI* pFilter);
static void initFunctionStateStore(SFunctionStateStore* pStore);
void initStorageAPI(SStorageAPI* pAPI) { void initStorageAPI(SStorageAPI* pAPI) {
initTsdbReaderAPI(&pAPI->tsdReader); initTsdbReaderAPI(&pAPI->tsdReader);
...@@ -32,6 +33,7 @@ void initStorageAPI(SStorageAPI* pAPI) { ...@@ -32,6 +33,7 @@ void initStorageAPI(SStorageAPI* pAPI) {
initStateStoreAPI(&pAPI->stateStore); initStateStoreAPI(&pAPI->stateStore);
initMetaReaderAPI(&pAPI->metaReaderFn); initMetaReaderAPI(&pAPI->metaReaderFn);
initMetaFilterAPI(&pAPI->metaFilter); initMetaFilterAPI(&pAPI->metaFilter);
initFunctionStateStore(&pAPI->functionStore);
} }
void initTsdbReaderAPI(TsdReader* pReader) { void initTsdbReaderAPI(TsdReader* pReader) {
...@@ -206,4 +208,9 @@ void initMetaFilterAPI(SMetaDataFilterAPI* pFilter) { ...@@ -206,4 +208,9 @@ void initMetaFilterAPI(SMetaDataFilterAPI* pFilter) {
pFilter->metaFilterTableIds = metaFilterTableIds; pFilter->metaFilterTableIds = metaFilterTableIds;
pFilter->metaFilterTableName = metaFilterTableName; pFilter->metaFilterTableName = metaFilterTableName;
pFilter->metaFilterTtl = metaFilterTtl; pFilter->metaFilterTtl = metaFilterTtl;
}
void initFunctionStateStore(SFunctionStateStore* pStore) {
pStore->streamStateFuncPut = streamStateFuncPut;
pStore->streamStateFuncGet = streamStateFuncGet;
} }
\ No newline at end of file
...@@ -166,7 +166,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); ...@@ -166,7 +166,7 @@ void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId);
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
......
...@@ -579,12 +579,11 @@ void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo); ...@@ -579,12 +579,11 @@ void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo); void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
void cleanupExprSupp(SExprSupp* pSup); void cleanupExprSupp(SExprSupp* pSup);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState); const char* pkey, void* pState, SFunctionStateStore* pStore);
void cleanupAggSup(SAggSupporter* pAggSup); void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
......
...@@ -84,7 +84,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -84,7 +84,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -95,7 +95,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN ...@@ -95,7 +95,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr);
} }
code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -485,8 +485,8 @@ void cleanupAggSup(SAggSupporter* pAggSup) { ...@@ -485,8 +485,8 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
} }
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState) { const char* pkey, void* pState, SFunctionStateStore* pStore) {
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -126,7 +126,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -126,7 +126,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
if (pScanNode->scan.pScanPseudoCols != NULL) { if (pScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* p = &pInfo->pseudoExprSup; SExprSupp* p = &pInfo->pseudoExprSup;
p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs); p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset); p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
} }
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
......
...@@ -92,7 +92,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo ...@@ -92,7 +92,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
if (pEventWindowNode->window.pExprs != NULL) { if (pEventWindowNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0; int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr); SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -110,7 +110,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo ...@@ -110,7 +110,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -1544,7 +1544,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu ...@@ -1544,7 +1544,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset) { SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore) {
SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) { if (pFuncCtx == NULL) {
return NULL; return NULL;
...@@ -1607,6 +1607,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -1607,6 +1607,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->numOfParams = pExpr->base.numOfParams; pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->param = pFunct->pParam; pCtx->param = pFunct->pParam;
pCtx->saveHandle.currentPage = -1; pCtx->saveHandle.currentPage = -1;
pCtx->pStore = pStore;
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {
......
...@@ -979,11 +979,11 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -979,11 +979,11 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL; return NULL;
} }
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) { int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
pSup->pExprInfo = pExprInfo; pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfExpr; pSup->numOfExprs = numOfExpr;
if (pSup->pExprInfo != NULL) { if (pSup->pExprInfo != NULL) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset); pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
if (pSup->pCtx == NULL) { if (pSup->pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
......
...@@ -338,7 +338,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -338,7 +338,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error; goto _error;
} }
code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs); code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -355,7 +355,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -355,7 +355,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr); code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -525,7 +525,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, ...@@ -525,7 +525,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
pFillSup->cur.key = key.ts; pFillSup->cur.key = key.ts;
pFillSup->cur.pRowVal = curVal; pFillSup->cur.pRowVal = curVal;
void* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key); SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
void* preVal = NULL; void* preVal = NULL;
int32_t preVLen = 0; int32_t preVLen = 0;
...@@ -1139,7 +1139,8 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { ...@@ -1139,7 +1139,8 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
TSKEY endTs = ts; TSKEY endTs = ts;
uint64_t groupId = groupIds[pInfo->srcDelRowIndex]; uint64_t groupId = groupIds[pInfo->srcDelRowIndex];
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
void* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key); SStreamStateCur* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key);
if (!pCur) { if (!pCur) {
pInfo->srcDelRowIndex++; pInfo->srcDelRowIndex++;
continue; continue;
...@@ -1355,7 +1356,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod ...@@ -1355,7 +1356,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
} }
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols); SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols); code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamFillSupporter(pFillSup); destroyStreamFillSupporter(pFillSup);
return NULL; return NULL;
...@@ -1491,7 +1492,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1491,7 +1492,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
goto _error; goto _error;
} }
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -451,7 +451,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -451,7 +451,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
} }
pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -467,7 +467,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* ...@@ -467,7 +467,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -849,7 +849,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition ...@@ -849,7 +849,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
if (pPartNode->pExprs != NULL) { if (pPartNode->pExprs != NULL) {
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num); int32_t code = initExprSupp(&pInfo->scalarSup, pExprInfo1, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
...@@ -1242,7 +1242,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1242,7 +1242,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
if (pPartNode->part.pExprs != NULL) { if (pPartNode->part.pExprs != NULL) {
int32_t num = 0; int32_t num = 0;
SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num); SExprInfo* pCalExprInfo = createExprInfo(pPartNode->part.pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num); code = initExprSupp(&pInfo->scalarSup, pCalExprInfo, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1257,7 +1257,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1257,7 +1257,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
} }
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr; pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0); createExprFromOneNode(pSubTableExpr, pPartNode->pSubtable, 0);
code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1); code = initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1271,7 +1271,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr ...@@ -1271,7 +1271,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
......
...@@ -115,7 +115,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys ...@@ -115,7 +115,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initResultSizeInfo(&pOperator->resultInfo, numOfRows); initResultSizeInfo(&pOperator->resultInfo, numOfRows);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -370,7 +370,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -370,7 +370,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
if (pPhyNode->pExprs != NULL) { if (pPhyNode->pExprs != NULL) {
int32_t num = 0; int32_t num = 0;
SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num); SExprInfo* pSExpr = createExprInfo(pPhyNode->pExprs, NULL, &num);
int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num); int32_t code = initExprSupp(&pInfo->scalarSup, pSExpr, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -392,7 +392,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -392,7 +392,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
blockDataEnsureCapacity(pResBlock, numOfRows); blockDataEnsureCapacity(pResBlock, numOfRows);
int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -962,7 +962,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -962,7 +962,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
if (pScanNode->pScanPseudoCols != NULL) { if (pScanNode->pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->base.pseudoSup; SExprSupp* pSup = &pInfo->base.pseudoSup;
pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
...@@ -2349,7 +2349,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2349,7 +2349,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->tbnameCalSup.pExprInfo = pSubTableExpr; pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0); createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) { if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) {
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
...@@ -2363,7 +2363,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2363,7 +2363,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
goto _error; goto _error;
} }
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
goto _error; goto _error;
...@@ -2612,7 +2612,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi ...@@ -2612,7 +2612,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
int32_t numOfExprs = 0; int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -3033,7 +3033,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -3033,7 +3033,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pTableScanNode->scan.pScanPseudoCols != NULL) { if (pTableScanNode->scan.pScanPseudoCols != NULL) {
SExprSupp* pSup = &pInfo->base.pseudoSup; SExprSupp* pSup = &pInfo->base.pseudoSup;
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
......
...@@ -60,7 +60,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* ...@@ -60,7 +60,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
} }
pOperator->exprSupp.pCtx = pOperator->exprSupp.pCtx =
createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -500,7 +500,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort ...@@ -500,7 +500,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pSup->numOfExprs = numOfCols; pSup->numOfExprs = numOfCols;
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
......
...@@ -2301,7 +2301,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -2301,7 +2301,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -818,7 +818,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -818,7 +818,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
int32_t numOfExprs = 0; int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs); int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -826,7 +826,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode ...@@ -826,7 +826,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
if (pInterpPhyNode->pExprs != NULL) { if (pInterpPhyNode->pExprs != NULL) {
int32_t num = 0; int32_t num = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num); code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -1656,7 +1656,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1656,7 +1656,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
int32_t code = int32_t code =
initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState); initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1684,7 +1684,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh ...@@ -1684,7 +1684,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
if (pPhyNode->window.pExprs != NULL) { if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar); SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1884,7 +1884,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -1884,7 +1884,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
if (pStateNode->window.pExprs != NULL) { if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalarExpr = 0; int32_t numOfScalarExpr = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr);
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1910,7 +1910,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi ...@@ -1910,7 +1910,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -1979,7 +1979,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW ...@@ -1979,7 +1979,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2744,7 +2744,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2744,7 +2744,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (pIntervalPhyNode->window.pExprs != NULL) { if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2757,9 +2757,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2757,9 +2757,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pInfo->pState); pInfo->pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2861,9 +2862,9 @@ void destroyStreamSessionAggOperatorInfo(void* param) { ...@@ -2861,9 +2862,9 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
} }
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock) { SSDataBlock* pResultBlock, SFunctionStateStore* pStore) {
initBasicInfo(pBasicInfo, pResultBlock); initBasicInfo(pBasicInfo, pResultBlock);
int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2936,6 +2937,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, ...@@ -2936,6 +2937,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
if (bufSize <= pageSize) { if (bufSize <= pageSize) {
bufSize = pageSize * 4; bufSize = pageSize * 4;
} }
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE; terrno = TSDB_CODE_NO_DISKSPACE;
qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir); qError("Init stream agg supporter failed since %s, tempDir:%s", terrstr(), tsTempDir);
...@@ -3588,7 +3590,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3588,7 +3590,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
if (pSessionNode->window.pExprs != NULL) { if (pSessionNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -3597,7 +3599,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh ...@@ -3597,7 +3599,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4126,7 +4128,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4126,7 +4128,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
if (pStateNode->window.pExprs != NULL) { if (pStateNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4145,7 +4147,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4145,7 +4147,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4433,7 +4435,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -4433,7 +4435,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4719,7 +4721,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge ...@@ -4719,7 +4721,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState); pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4764,7 +4766,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4764,7 +4766,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
...@@ -4948,7 +4950,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4948,7 +4950,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pInfo->pState); pInfo->pState, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -4956,7 +4958,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4956,7 +4958,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
if (pIntervalPhyNode->window.pExprs != NULL) { if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "function.h" #include "function.h"
#include "query.h" #include "query.h"
#include "querynodes.h" #include "querynodes.h"
//#include "streamState.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdigest.h" #include "tdigest.h"
...@@ -1697,7 +1696,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -1697,7 +1696,7 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
int32_t code = getPercentile(pMemBucket, v, &ppInfo->result); code = getPercentile(pMemBucket, v, &ppInfo->result);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _fin_error; goto _fin_error;
} }
...@@ -3120,7 +3119,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid ...@@ -3120,7 +3119,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
} }
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key, static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key,
STuplePos* pPos) { STuplePos* pPos, SFunctionStateStore* pStore) {
STuplePos p = {0}; STuplePos p = {0};
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = NULL; SFilePage* pPage = NULL;
...@@ -3153,11 +3152,10 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, ...@@ -3153,11 +3152,10 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
pPage->num += length; pPage->num += length;
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} else { } else { // other tuple save policy
// other tuple save policy if (pStore->streamStateFuncPut(pHandle->pState, key, pBuf, length) >= 0) {
// if (streamStateFuncPut(pHandle->pState, key, pBuf, length) >= 0) { p.streamTupleKey = *key;
// p.streamTupleKey = *key; }
// }
} }
*pPos = p; *pPos = p;
...@@ -3179,10 +3177,10 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* ...@@ -3179,10 +3177,10 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
} }
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, &key, pPos); return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, &key, pPos, pCtx->pStore);
} }
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos, SFunctionStateStore* pStore) {
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) { if (pPage == NULL) {
...@@ -3192,7 +3190,7 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf ...@@ -3192,7 +3190,7 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} else { } else {
// streamStateFuncPut(pHandle->pState, &pPos->streamTupleKey, pBuf, length); pStore->streamStateFuncPut(pHandle->pState, &pPos->streamTupleKey, pBuf, length);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3202,10 +3200,10 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc ...@@ -3202,10 +3200,10 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
prepareBuf(pCtx); prepareBuf(pCtx);
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos); return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos, pCtx->pStore);
} }
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) { static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos, SFunctionStateStore* pStore) {
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) { if (pPage == NULL) {
...@@ -3217,13 +3215,13 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo ...@@ -3217,13 +3215,13 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
} else { } else {
void* value = NULL; void* value = NULL;
int32_t vLen; int32_t vLen;
// streamStateFuncGet(pHandle->pState, &pPos->streamTupleKey, &value, &vLen); pStore->streamStateFuncGet(pHandle->pState, &pPos->streamTupleKey, &value, &vLen);
return (char*)value; return (char*)value;
} }
} }
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) { const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) {
return doLoadTupleData(&pCtx->saveHandle, pPos); return doLoadTupleData(&pCtx->saveHandle, pPos, pCtx->pStore);
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
...@@ -4991,7 +4989,7 @@ static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pI ...@@ -4991,7 +4989,7 @@ static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pI
memcpy(pInfo->buf, data, pInfo->colBytes); memcpy(pInfo->buf, data, pInfo->colBytes);
} }
return doSaveTupleData(&pCtx->saveHandle, pInfo->buf, pInfo->colBytes, NULL, pPos); return doSaveTupleData(&pCtx->saveHandle, pInfo->buf, pInfo->colBytes, NULL, pPos, pCtx->pStore);
} }
static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) {
...@@ -5020,7 +5018,7 @@ static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCt ...@@ -5020,7 +5018,7 @@ static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCt
} else { } else {
pHashItem->count += 1; pHashItem->count += 1;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pHashItem->tuplePos); code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pHashItem->tuplePos);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include "tcommon.h" #include "tcommon.h"
#include "tcompare.h" #include "tcompare.h"
#include "tref.h" #include "tref.h"
#include "ttimer.h"
#define MAX_TABLE_NAME_NUM 2000000 #define MAX_TABLE_NAME_NUM 2000000
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册