提交 429b5cd6 编写于 作者: 5 54liuyao

feat:add stream file state

上级 628bb62c
......@@ -165,7 +165,8 @@ extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -17,6 +17,7 @@
#include "rocksdb/c.h"
#include "tdbInt.h"
#include "tstreamFileState.h"
#ifdef __cplusplus
extern "C" {
......@@ -54,6 +55,7 @@ typedef struct STdbState {
// incremental state storage
typedef struct {
STdbState* pTdbState;
SStreamFileState* pFileState;
int32_t number;
} SStreamState;
......@@ -61,7 +63,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);
int32_t streamStateAbort(SStreamState* pState);
void streamStateDestroy(SStreamState* pState);
typedef struct {
......@@ -126,9 +127,6 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
/***compare func **/
// todo refactor
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_FILE_STATE_H_
#define _STREAM_FILE_STATE_H_
#include "os.h"
#include "tdef.h"
#include "tlist.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SStreamFileState SStreamFileState;
typedef struct SRowBuffPos {
void* pRowBuff;
void* pKey;
bool beFlushed;
bool beUsed;
} SRowBuffPos;
typedef SList SStreamSnapshot;
typedef bool (*ExpiredFun)(void*, TSKEY);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile);
void destroyStreamFileState(SStreamFileState* pFileState);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize);
int32_t recoverSnapshot(SStreamFileState* pFileState);
#ifdef __cplusplus
}
#endif
#endif // _STREAM_FILE_STATE_H_
......@@ -17,6 +17,7 @@
#define _TD_UTIL_LIST_H_
#include "os.h"
#include "talgo.h"
#ifdef __cplusplus
extern "C" {
......@@ -222,6 +223,7 @@ void tdListInit(SList *list, int32_t eleSize);
void tdListEmpty(SList *list);
SList *tdListNew(int32_t eleSize);
void *tdListFree(SList *list);
void *tdListFreeP(SList *list, FDelete fp);
void tdListPrependNode(SList *list, SListNode *node);
void tdListAppendNode(SList *list, SListNode *node);
int32_t tdListPrepend(SList *list, void *data);
......
......@@ -196,6 +196,7 @@ int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......@@ -496,6 +497,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
if (cfgAddBool(pCfg, "streamBufferSize", tsStreamBufferSize, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
......@@ -824,7 +826,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
}
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
tsDisableStream = cfgGetItem(pCfg, "disableStream")->i64;
GRANT_CFG_GET;
return 0;
......
......@@ -865,7 +865,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
......
......@@ -2723,7 +2723,7 @@ int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void
return 0;
}
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SWinKey key = {
.ts = win->skey,
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "tglobal.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......@@ -2151,7 +2152,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
continue;
}
if (num == 0) {
int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
ASSERT(pCurResult != NULL);
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
......@@ -2160,7 +2161,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
}
num++;
SResultRow* pChResult = NULL;
setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
......@@ -2412,7 +2413,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
}
}
int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
......@@ -4853,6 +4854,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
}
bool compareTs(void* pKey, TSKEY mark) {
SWinKey* pWinKey = (SWinKey*) pKey;
return pWinKey->ts < mark;
}
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
......@@ -4939,6 +4945,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, pInfo->pState);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
......
......@@ -263,22 +263,6 @@ int32_t streamStateCommit(SStreamState* pState) {
#endif
}
int32_t streamStateAbort(SStreamState* pState) {
#ifdef USE_ROCKSDB
return 0;
#else
if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
return -1;
}
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
return 0;
#endif
}
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateFuncPut_rocksdb(pState, key, value, vLen);
......@@ -305,7 +289,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
// todo refactor
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStatePut_rocksdb(pState, key, value, vLen);
return 0;
// return streamStatePut_rocksdb(pState, key, value, vLen);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
......@@ -315,7 +300,8 @@ int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* val
// todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateGet_rocksdb(pState, key, pVal, pVLen);
return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
// return streamStateGet_rocksdb(pState, key, pVal, pVLen);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
......@@ -1033,22 +1019,6 @@ _end:
#endif
}
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
#ifdef USE_ROCKSDB
return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen);
#else
return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
#endif
}
int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
#ifdef USE_ROCKSDB
return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen);
#else
return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
#endif
}
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
qWarn("try to write to cf parname");
#ifdef USE_ROCKSDB
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstreamFileState.h"
#include "taos.h"
#include "thash.h"
#include "tsimplehash.h"
#include "streamBackendRocksdb.h"
#define FLUSH_RATIO 0.2
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
struct SStreamFileState {
SList* usedBuffs;
SList* freeBuffs;
SSHashObj* rowBuffMap;
void* pFileStore;
int32_t rowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
uint64_t maxRowCount;
uint64_t curRowCount;
ExpiredFun expFunc;
};
typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
}
if (rowSize == 0) {
goto _error;
}
SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState));
if (!pFileState) {
goto _error;
}
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFileState->rowBuffMap = tSimpleHashInit(1024, hashFn);
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
goto _error;
}
pFileState->rowSize = rowSize;
pFileState->preCheckPointVersion = 0;
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->expFunc = fp;
pFileState->maxRowCount = memSize / rowSize;
pFileState->curRowCount = 0;
return pFileState;
_error:
destroyStreamFileState(pFileState);
return NULL;
}
void destroyRowBuffPos(SRowBuffPos* pPos) {
taosMemoryFreeClear(pPos->pRowBuff);
taosMemoryFree(pPos);
}
void destroyRowBuffPosPtr(void* ptr) {
if (!ptr) {
return;
}
void* tmp = *(void**)ptr;
SRowBuffPos* pPos = (SRowBuffPos*)tmp;
destroyRowBuffPos(pPos);
}
void destroyStreamFileState(SStreamFileState* pFileState) {
tdListFreeP(pFileState->usedBuffs, destroyRowBuffPosPtr);
tdListFreeP(pFileState->freeBuffs, taosMemoryFree);
tSimpleHashCleanup(pFileState->rowBuffMap);
}
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts) {
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (pFileState->expFunc(pPos->pKey, ts)) {
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
pPos->pRowBuff = NULL;
destroyRowBuffPos(pPos);
}
}
}
int32_t flushRowBuff(SStreamFileState* pFileState) {
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
if (!pFlushList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
uint64_t i = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && i < num) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (!pPos->beUsed) {
tdListAppend(pFlushList, &pPos);
i++;
}
}
flushSnapshot(pFileState->pFileStore, pFlushList, pFileState->rowSize);
return TSDB_CODE_SUCCESS;
}
int32_t clearRowBuff(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark);
if (isListEmpty(pFileState->freeBuffs)) {
return flushRowBuff(pFileState);
}
return TSDB_CODE_SUCCESS;
}
void* getFreeBuff(SList* lists) {
SListNode* pNode = tdListPopHead(lists);
if (!pNode) {
return NULL;
}
void* ptr = *(void**)pNode->data;
taosMemoryFree(pNode);
return ptr;
}
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
tdListAppend(pFileState->usedBuffs, &pPos);
void* pBuff = getFreeBuff(pFileState->freeBuffs);
if (pBuff) {
pPos->pRowBuff = pBuff;
return pPos;
}
if (pFileState->curRowCount < pFileState->maxRowCount) {
pBuff = taosMemoryCalloc(1, pFileState->rowSize);
if (pBuff) {
pPos->pRowBuff = pBuff;
pFileState->curRowCount++;
return pPos;
}
}
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs);
return pPos;
}
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
if (pos) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
return TSDB_CODE_SUCCESS;
}
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
ASSERT(pNewPos);// todo(liuyao) delete
tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES);
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
return TSDB_CODE_SUCCESS;
}
void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos) {
if (pPos->pRowBuff) {
return pPos->pRowBuff;
}
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs);
void* pVal = NULL;
int32_t len = 0;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pVal, &len);
memcpy(pPos->pRowBuff, pVal, len);
taosMemoryFree(pVal);
return pPos->pRowBuff;
}
void releaseRowBuffPos(SRowBuffPos* pBuff) {
pBuff->beUsed = false;
}
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark);
return pFileState->usedBuffs;
}
int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize) {
int32_t code = TSDB_CODE_SUCCESS;
SListIter iter = {0};
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
code = streamStatePut_rocksdb(pFile, pPos->pKey, pPos->pRowBuff, rowSize);
}
return code;
}
int32_t recoverSnapshot(SStreamFileState* pFileState) {
// 设置一个时间戳标记,小于这个时间戳的,如果缓存里没有,需要从rocks db里读取状态,大于这个时间戳的,不需要
// 这个还需要考虑一下,如果rocks db中也没有,说明真的是新的,那么这次读取是冗余的。
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -46,6 +46,24 @@ void *tdListFree(SList *list) {
return NULL;
}
void tdListEmptyP(SList *list, FDelete fp) {
SListNode *node;
while ((node = TD_DLIST_HEAD(list)) != NULL) {
TD_DLIST_POP(list, node);
fp(node->data);
taosMemoryFree(node);
}
}
void *tdListFreeP(SList *list, FDelete fp) {
if (list) {
tdListEmptyP(list, fp);
taosMemoryFree(list);
}
return NULL;
}
void tdListPrependNode(SList *list, SListNode *node) { TD_DLIST_PREPEND(list, node); }
void tdListAppendNode(SList *list, SListNode *node) { TD_DLIST_APPEND(list, node); }
......
......@@ -255,6 +255,8 @@
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册