提交 4c6a8bbc 编写于 作者: L Liu Jicong

feat(stream): stream state support tuple

上级 e5da5802
...@@ -45,8 +45,8 @@ enum { ...@@ -45,8 +45,8 @@ enum {
// clang-format on // clang-format on
typedef struct { typedef struct {
TSKEY ts;
uint64_t groupId; uint64_t groupId;
TSKEY ts;
} SWinKey; } SWinKey;
static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
...@@ -68,6 +68,37 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i ...@@ -68,6 +68,37 @@ static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, i
return 0; return 0;
} }
typedef struct {
uint64_t groupId;
TSKEY ts;
int32_t exprIdx;
} STupleKey;
static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
STupleKey* pTuple1 = (STupleKey*)pKey1;
STupleKey* pTuple2 = (STupleKey*)pKey2;
if (pTuple1->groupId > pTuple2->groupId) {
return 1;
} else if (pTuple1->groupId < pTuple2->groupId) {
return -1;
}
if (pTuple1->ts > pTuple2->ts) {
return 1;
} else if (pTuple1->ts < pTuple2->ts) {
return -1;
}
if (pTuple1->exprIdx > pTuple2->exprIdx) {
return 1;
} else if (pTuple1->exprIdx < pTuple2->exprIdx) {
return -1;
}
return 0;
}
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_RSP,
......
...@@ -34,66 +34,69 @@ typedef struct SFuncExecEnv { ...@@ -34,66 +34,69 @@ typedef struct SFuncExecEnv {
int32_t calcMemSize; int32_t calcMemSize;
} SFuncExecEnv; } SFuncExecEnv;
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx); typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
typedef struct SScalarFuncExecFuncs { typedef struct SScalarFuncExecFuncs {
FExecGetEnv getEnv; FExecGetEnv getEnv;
FScalarExecProcess process; FScalarExecProcess process;
} SScalarFuncExecFuncs; } SScalarFuncExecFuncs;
typedef struct SFuncExecFuncs { typedef struct SFuncExecFuncs {
FExecGetEnv getEnv; FExecGetEnv getEnv;
FExecInit init; FExecInit init;
FExecProcess process; FExecProcess process;
FExecFinalize finalize; FExecFinalize finalize;
FExecCombine combine; FExecCombine combine;
} SFuncExecFuncs; } SFuncExecFuncs;
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define TOP_BOTTOM_QUERY_LIMIT 100 #define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 16 #define FUNCTIONS_NAME_MAX_LENGTH 16
typedef struct SResultRowEntryInfo { typedef struct SResultRowEntryInfo {
bool initialized:1; // output buffer has been initialized bool initialized : 1; // output buffer has been initialized
bool complete:1; // query has completed bool complete : 1; // query has completed
uint8_t isNullRes:6; // the result is null uint8_t isNullRes : 6; // the result is null
uint16_t numOfRes; // num of output result in current buffer. NOT NULL RESULT uint16_t numOfRes; // num of output result in current buffer. NOT NULL RESULT
} SResultRowEntryInfo; } SResultRowEntryInfo;
// determine the real data need to calculated the result // determine the real data need to calculated the result
enum { enum {
BLK_DATA_NOT_LOAD = 0x0, BLK_DATA_NOT_LOAD = 0x0,
BLK_DATA_SMA_LOAD = 0x1, BLK_DATA_SMA_LOAD = 0x1,
BLK_DATA_DATA_LOAD = 0x3, BLK_DATA_DATA_LOAD = 0x3,
BLK_DATA_FILTEROUT = 0x4, // discard current data block since it is not qualified for filter BLK_DATA_FILTEROUT = 0x4, // discard current data block since it is not qualified for filter
}; };
enum { enum {
MAIN_SCAN = 0x0u, MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it REVERSE_SCAN = 0x1u, // todo remove it
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan
MERGE_STAGE = 0x20u, MERGE_STAGE = 0x20u,
}; };
typedef struct SPoint1 { typedef struct SPoint1 {
int64_t key; int64_t key;
union{double val; char* ptr;}; union {
double val;
char *ptr;
};
} SPoint1; } SPoint1;
struct SqlFunctionCtx; struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified // for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SSubsidiaryResInfo { typedef struct SSubsidiaryResInfo {
int16_t num; int16_t num;
int32_t rowLen; int32_t rowLen;
char* buf; // serialize data buffer char *buf; // serialize data buffer
struct SqlFunctionCtx **pCtx; struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo; } SSubsidiaryResInfo;
...@@ -106,69 +109,70 @@ typedef struct SResultDataInfo { ...@@ -106,69 +109,70 @@ typedef struct SResultDataInfo {
} SResultDataInfo; } SResultDataInfo;
#define GET_RES_INFO(ctx) ((ctx)->resultInfo) #define GET_RES_INFO(ctx) ((ctx)->resultInfo)
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowEntryInfo))) #define GET_ROWCELL_INTERBUF(_c) ((void *)((char *)(_c) + sizeof(SResultRowEntryInfo)))
typedef struct SInputColumnInfoData { typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index int32_t startRowIndex; // handle started row index
int32_t numOfRows; // the number of rows needs to be handled int32_t numOfRows; // the number of rows needs to be handled
int32_t numOfInputCols; // PTS is not included int32_t numOfInputCols; // PTS is not included
bool colDataAggIsSet;// if agg is set or not bool colDataAggIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData *pPTS; // primary timestamp column
SColumnInfoData **pData; SColumnInfoData **pData;
SColumnDataAgg **pColumnDataAgg; SColumnDataAgg **pColumnDataAgg;
uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions. uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions.
} SInputColumnInfoData; } SInputColumnInfoData;
typedef struct SSerializeDataHandle { typedef struct SSerializeDataHandle {
struct SDiskbasedBuf* pBuf; struct SDiskbasedBuf *pBuf;
int32_t currentPage; int32_t currentPage;
void *pState;
} SSerializeDataHandle; } SSerializeDataHandle;
// sql function runtime context // sql function runtime context
typedef struct SqlFunctionCtx { typedef struct SqlFunctionCtx {
SInputColumnInfoData input; SInputColumnInfoData input;
SResultDataInfo resDataInfo; SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0 uint8_t scanFlag; // record current running step, default: 0
int16_t functionId; // function id int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset; int32_t offset;
struct SResultRowEntryInfo *resultInfo; struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries; SSubsidiaryResInfo subsidiaries;
SPoint1 start; SPoint1 start;
SPoint1 end; SPoint1 end;
SFuncExecFuncs fpSet; SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp; SScalarFuncExecFuncs sfp;
struct SExprInfo *pExpr; struct SExprInfo *pExpr;
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle; SSerializeDataHandle saveHandle;
bool isStream; bool isStream;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
enum { enum {
TEXPR_BINARYEXPR_NODE= 0x1, TEXPR_BINARYEXPR_NODE = 0x1,
TEXPR_UNARYEXPR_NODE = 0x2, TEXPR_UNARYEXPR_NODE = 0x2,
}; };
typedef struct tExprNode { typedef struct tExprNode {
int32_t nodeType; int32_t nodeType;
union { union {
struct {// function node struct { // function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId; int32_t functionId;
int32_t num; int32_t num;
struct SFunctionNode *pFunctNode; struct SFunctionNode *pFunctNode;
} _function; } _function;
struct { struct {
struct SNode* pRootNode; struct SNode *pRootNode;
} _optrRoot; } _optrRoot;
}; };
} tExprNode; } tExprNode;
...@@ -182,17 +186,18 @@ struct SScalarParam { ...@@ -182,17 +186,18 @@ struct SScalarParam {
int32_t numOfRows; int32_t numOfRows;
}; };
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock); int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry);
typedef struct SPoint { typedef struct SPoint {
int64_t key; int64_t key;
void * val; void *val;
} SPoint; } SPoint;
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint *point1, SPoint *point2,
int32_t inputType);
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api // udf api
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "tdbInt.h"
#ifdef __cplusplus
extern "C" {
#endif
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
typedef struct SStreamTask SStreamTask;
// incremental state storage
typedef struct {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
TXN txn;
} SStreamState;
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
typedef struct {
TBC* pCur;
} SStreamStateCur;
#if 1
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
#endif
#ifdef __cplusplus
}
#endif
#endif /* ifndef _STREAM_STATE_H_ */
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "executor.h" #include "executor.h"
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
#include "streamState.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdbInt.h" #include "tdbInt.h"
#include "tmsg.h" #include "tmsg.h"
...@@ -263,14 +264,6 @@ typedef struct { ...@@ -263,14 +264,6 @@ typedef struct {
SArray* checkpointVer; SArray* checkpointVer;
} SStreamRecoveringState; } SStreamRecoveringState;
// incremental state storage
typedef struct {
SStreamTask* pOwner;
TDB* db;
TTB* pStateDb;
TXN txn;
} SStreamState;
typedef struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
...@@ -540,39 +533,6 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); ...@@ -540,39 +533,6 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta);
SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
typedef struct {
TBC* pCur;
} SStreamStateCur;
#if 1
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key);
void streamStateFreeCur(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -14,7 +14,7 @@ target_include_directories( ...@@ -14,7 +14,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
function function
PRIVATE os util common nodes scalar qcom transport PRIVATE os util common nodes scalar qcom transport stream
PUBLIC uv_a PUBLIC uv_a
) )
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "function.h" #include "function.h"
#include "query.h" #include "query.h"
#include "querynodes.h" #include "querynodes.h"
#include "streamState.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdigest.h" #include "tdigest.h"
...@@ -56,8 +57,13 @@ typedef struct SAvgRes { ...@@ -56,8 +57,13 @@ typedef struct SAvgRes {
} SAvgRes; } SAvgRes;
typedef struct STuplePos { typedef struct STuplePos {
int32_t pageId; union {
int32_t offset; struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos; } STuplePos;
typedef struct SMinmaxResInfo { typedef struct SMinmaxResInfo {
...@@ -1146,7 +1152,8 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { ...@@ -1146,7 +1152,8 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true; return true;
} }
static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock); static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock,
const STupleKey* pKey);
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
...@@ -1201,7 +1208,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1201,7 +1208,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = *(int64_t*)tval; pBuf->v = *(int64_t*)tval;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
} }
} else { } else {
if (IS_SIGNED_NUMERIC_TYPE(type) || IS_TIMESTAMP_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type) || IS_TIMESTAMP_TYPE(type)) {
...@@ -1213,7 +1220,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1213,7 +1220,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(int64_t*)&pBuf->v = val; *(int64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
} }
} }
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
...@@ -1225,7 +1232,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1225,7 +1232,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(uint64_t*)&pBuf->v = val; *(uint64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
} }
} }
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
...@@ -1237,7 +1244,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1237,7 +1244,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(double*)&pBuf->v = val; *(double*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
} }
} }
} else if (type == TSDB_DATA_TYPE_FLOAT) { } else if (type == TSDB_DATA_TYPE_FLOAT) {
...@@ -1251,7 +1258,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1251,7 +1258,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
} }
} }
} }
...@@ -1276,7 +1283,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1276,7 +1283,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1307,7 +1314,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1307,7 +1314,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1338,7 +1345,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1338,7 +1345,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1369,7 +1376,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1369,7 +1376,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1402,7 +1409,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1402,7 +1409,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1433,7 +1440,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1433,7 +1440,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1464,7 +1471,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1464,7 +1471,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1495,7 +1502,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1495,7 +1502,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1527,7 +1534,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1527,7 +1534,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1558,7 +1565,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1558,7 +1565,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
...@@ -1581,7 +1588,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1581,7 +1588,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
_min_max_over: _min_max_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL);
pBuf->nullTupleSaved = true; pBuf->nullTupleSaved = true;
} }
return numOfElems; return numOfElems;
...@@ -2758,7 +2765,7 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde ...@@ -2758,7 +2765,7 @@ static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowInde
} }
if (!pInfo->hasResult) { if (!pInfo->hasResult) {
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock); pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL);
} else { } else {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} }
...@@ -3426,7 +3433,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { ...@@ -3426,7 +3433,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3454,7 +3461,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { ...@@ -3454,7 +3461,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
...@@ -3506,7 +3513,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -3506,7 +3513,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple // save the data of this tuple
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock); pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock, NULL);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
...@@ -3578,7 +3585,8 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid ...@@ -3578,7 +3585,8 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf; return buf;
} }
static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length) { static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length,
const STupleKey* pKey) {
STuplePos p = {0}; STuplePos p = {0};
if (pHandle->pBuf != NULL) { if (pHandle->pBuf != NULL) {
SFilePage* pPage = NULL; SFilePage* pPage = NULL;
...@@ -3604,12 +3612,16 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf ...@@ -3604,12 +3612,16 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} else { } else {
// other tuple save policy // other tuple save policy
if (streamStateFuncPut(pHandle->pState, pKey, pBuf, length) < 0) {
ASSERT(0);
}
p.streamTupleKey = *pKey;
} }
return p; return p;
} }
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) { STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, const STupleKey* pKey) {
if (pCtx->subsidiaries.rowLen == 0) { if (pCtx->subsidiaries.rowLen == 0) {
int32_t rowLen = 0; int32_t rowLen = 0;
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
...@@ -3622,7 +3634,7 @@ STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc ...@@ -3622,7 +3634,7 @@ STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
} }
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen); return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pKey);
} }
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
...@@ -3632,6 +3644,7 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf ...@@ -3632,6 +3644,7 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} else { } else {
streamStateFuncPut(pHandle->pState, &pPos->streamTupleKey, pBuf, length);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -3650,7 +3663,10 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo ...@@ -3650,7 +3663,10 @@ static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPo
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
return p; return p;
} else { } else {
return NULL; void* value = NULL;
int32_t vLen;
streamStateFuncGet(pHandle->pState, &pPos->streamTupleKey, &value, &vLen);
return (char*)value;
} }
} }
...@@ -4981,7 +4997,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da ...@@ -4981,7 +4997,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock); pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL);
} }
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
...@@ -5012,7 +5028,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -5012,7 +5028,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock); pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL);
pInfo->nullTupleSaved = true; pInfo->nullTupleSaved = true;
} }
......
...@@ -35,6 +35,10 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { ...@@ -35,6 +35,10 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
goto _err; goto _err;
} }
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) {
goto _err;
}
if (streamStateBegin(pState) < 0) { if (streamStateBegin(pState) < 0) {
goto _err; goto _err;
} }
...@@ -44,8 +48,9 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { ...@@ -44,8 +48,9 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
return pState; return pState;
_err: _err:
if (pState->pStateDb) tdbTbClose(pState->pStateDb); tdbTbClose(pState->pStateDb);
if (pState->db) tdbClose(pState->db); tdbTbClose(pState->pFuncStateDb);
tdbClose(pState->db);
taosMemoryFree(pState); taosMemoryFree(pState);
return NULL; return NULL;
} }
...@@ -53,6 +58,7 @@ _err: ...@@ -53,6 +58,7 @@ _err:
void streamStateClose(SStreamState* pState) { void streamStateClose(SStreamState* pState) {
tdbCommit(pState->db, &pState->txn); tdbCommit(pState->db, &pState->txn);
tdbTbClose(pState->pStateDb); tdbTbClose(pState->pStateDb);
tdbTbClose(pState->pFuncStateDb);
tdbClose(pState->db); tdbClose(pState->db);
taosMemoryFree(pState); taosMemoryFree(pState);
...@@ -101,6 +107,17 @@ int32_t streamStateAbort(SStreamState* pState) { ...@@ -101,6 +107,17 @@ int32_t streamStateAbort(SStreamState* pState) {
return 0; return 0;
} }
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn);
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
}
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn);
}
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册