提交 b701198d 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 43e16c86
...@@ -1199,8 +1199,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS ...@@ -1199,8 +1199,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows); blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
(pRow->numOfRows+pBlock->info.rows), (pRow->numOfRows + pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo));
pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size // todo set the pOperator->resultInfo size
} }
...@@ -1242,7 +1241,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr ...@@ -1242,7 +1241,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
} else { } else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} }
tdbFree(tbname); streamFreeVal(tbname);
} }
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
...@@ -2596,6 +2595,7 @@ int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResul ...@@ -2596,6 +2595,7 @@ int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResul
} }
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) { int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize) {
qWarn("write to stream state");
streamStatePut(pState, pKey, pResult, resSize); streamStatePut(pState, pKey, pResult, resSize);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2633,7 +2633,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat ...@@ -2633,7 +2633,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
} else { } else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} }
tdbFree(tbname); streamFreeVal(tbname);
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) { if (pBlock->info.id.groupId != pKey->groupId) {
...@@ -2726,7 +2726,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ...@@ -2726,7 +2726,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
} else { } else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} }
tdbFree(tbname); streamFreeVal(tbname);
} else { } else {
// current value belongs to different group, it can't be packed into one datablock // current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) { if (pBlock->info.id.groupId != pKey->groupId) {
......
...@@ -140,7 +140,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -140,7 +140,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
while (1) { while (1) {
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
if (pBlock == NULL) { if (pBlock == NULL) {
if (pInfo->totalInputRows == 0 && (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) { if (pInfo->totalInputRows == 0 &&
(pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -381,12 +382,13 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ...@@ -381,12 +382,13 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyFillOperatorInfo(pInfo); destroyFillOperatorInfo(pInfo);
} }
...@@ -1049,7 +1051,7 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_ ...@@ -1049,7 +1051,7 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
tdbFree(tbname); streamFreeVal(tbname);
} }
pBlock->info.rows++; pBlock->info.rows++;
...@@ -1209,7 +1211,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1209,7 +1211,8 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
if (hasRemainCalc(pInfo->pFillInfo) || (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true )) { if (hasRemainCalc(pInfo->pFillInfo) ||
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes); doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill"); printDataBlock(pInfo->pRes, "stream fill");
...@@ -1373,8 +1376,8 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* ...@@ -1373,8 +1376,8 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
pFillInfo->pResRow = NULL; pFillInfo->pResRow = NULL;
if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
|| pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) { pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData)); pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
pFillInfo->pResRow->key = INT64_MIN; pFillInfo->pResRow->key = INT64_MIN;
pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize); pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
...@@ -1476,7 +1479,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1476,7 +1479,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = 0;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL); pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -1484,7 +1488,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi ...@@ -1484,7 +1488,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
} }
return pOperator; return pOperator;
_error: _error:
destroyStreamFillOperatorInfo(pInfo); destroyStreamFillOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
......
...@@ -966,7 +966,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { ...@@ -966,7 +966,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
void* tbname = NULL; void* tbname = NULL;
if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { if (streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
tdbFree(tbname); streamFreeVal(tbname);
} }
} }
taosArrayDestroy(pParInfo->rowIds); taosArrayDestroy(pParInfo->rowIds);
......
...@@ -1361,7 +1361,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS ...@@ -1361,7 +1361,7 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
varDataSetLen(tbname, strlen(varDataVal(tbname))); varDataSetLen(tbname, strlen(varDataVal(tbname)));
tdbFree(parTbname); streamFreeVal(parTbname);
} }
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
tbname[0] == 0 ? NULL : tbname); tbname[0] == 0 ? NULL : tbname);
...@@ -1608,8 +1608,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1608,8 +1608,9 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) { if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
pInfo->tqReader->pWalReader->curVersion);
pTaskInfo->streamInfo.returned = 1; pTaskInfo->streamInfo.returned = 1;
return pResult; return pResult;
} else { } else {
......
...@@ -1552,7 +1552,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin ...@@ -1552,7 +1552,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
} }
tdbFree(tbname); streamFreeVal(tbname);
(*index)++; (*index)++;
} }
} }
...@@ -3266,7 +3266,7 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo ...@@ -3266,7 +3266,7 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false); colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
tdbFree(tbname); streamFreeVal(tbname);
} }
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
......
...@@ -13,7 +13,7 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -13,7 +13,7 @@ if(${BUILD_WITH_ROCKSDB})
PUBLIC rocksdb tdb PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor PRIVATE os util transport qcom executor
) )
#add_definitions(-DUSE_ROCKSDB) add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
#include <bits/stdint-uintn.h>
#include <string.h>
#include "executor.h"
#include "osMemory.h"
#include "rocksdb/c.h"
#include "streamInc.h"
#include "streamState.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
#include "ttimer.h"
int streamInitBackend(SStreamState* pState, char* path);
void streamCleanBackend(SStreamState* pState);
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear_rocksdb(SStreamState* pState);
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateDestroy_rocksdb(SStreamState* pState);
#endif
\ No newline at end of file
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "executor.h" #include "executor.h"
#include "osMemory.h" #include "osMemory.h"
#include "rocksdb/c.h" #include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "streamInc.h" #include "streamInc.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcommon.h" #include "tcommon.h"
...@@ -102,6 +103,7 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { ...@@ -102,6 +103,7 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
} }
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
qWarn("open stream state, %s", path);
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) { if (pState == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -113,15 +115,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -113,15 +115,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
streamStateDestroy(pState); streamStateDestroy(pState);
return NULL; return NULL;
} }
#ifdef USE_ROCKSDB
int code = streamInitBackend(pState, path);
if (code == -1) {
taosMemoryFree(pState);
pState = NULL;
}
return pState;
#else
char statePath[1024]; char statePath[1024];
if (!specPath) { if (!specPath) {
...@@ -130,6 +123,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -130,6 +123,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
memset(statePath, 0, 1024); memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024); tstrncpy(statePath, path, 1024);
} }
#ifdef USE_ROCKSDB
qWarn("open stream state1");
int code = streamInitBackend(pState, statePath);
if (code == -1) {
taosMemoryFree(pState);
pState = NULL;
}
qWarn("open stream state2, %s", statePath);
pState->pTdbState->pOwner = pTask;
return pState;
#else
char cfgPath[1030]; char cfgPath[1030];
sprintf(cfgPath, "%s/cfg", statePath); sprintf(cfgPath, "%s/cfg", statePath);
...@@ -211,7 +216,7 @@ _err: ...@@ -211,7 +216,7 @@ _err:
void streamStateClose(SStreamState* pState) { void streamStateClose(SStreamState* pState) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
streamCleanBackend(pState); // streamCleanBackend(pState);
#else #else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
...@@ -227,15 +232,22 @@ void streamStateClose(SStreamState* pState) { ...@@ -227,15 +232,22 @@ void streamStateClose(SStreamState* pState) {
} }
int32_t streamStateBegin(SStreamState* pState) { int32_t streamStateBegin(SStreamState* pState) {
#ifdef USE_ROCKSDB
return 0;
#else
if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL, if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
tdbAbort(pState->pTdbState->db, pState->pTdbState->txn); tdbAbort(pState->pTdbState->db, pState->pTdbState->txn);
return -1; return -1;
} }
return 0; return 0;
#endif
} }
int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateCommit(SStreamState* pState) {
#ifdef USE_ROCKSDB
return 0;
#else
if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) { if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
...@@ -248,9 +260,13 @@ int32_t streamStateCommit(SStreamState* pState) { ...@@ -248,9 +260,13 @@ int32_t streamStateCommit(SStreamState* pState) {
return -1; return -1;
} }
return 0; return 0;
#endif
} }
int32_t streamStateAbort(SStreamState* pState) { int32_t streamStateAbort(SStreamState* pState) {
#ifdef USE_ROCKSDB
return 0;
#else
if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) { if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
return -1; return -1;
} }
...@@ -260,48 +276,84 @@ int32_t streamStateAbort(SStreamState* pState) { ...@@ -260,48 +276,84 @@ int32_t streamStateAbort(SStreamState* pState) {
return -1; return -1;
} }
return 0; return 0;
#endif
} }
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateFuncPut_rocksdb(pState, key, value, vLen);
#else
return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn); return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
#endif
} }
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateFuncGet(pState, key, pVal, pVLen);
#else
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
#endif
} }
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
#ifdef USE_ROCKSDB
return streamStateFuncDel_rocksdb(pState, key);
#else
return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn); return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
#endif
} }
// todo refactor // todo refactor
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStatePut_rocksdb(pState, key, value, vLen);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn); return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn);
#endif
} }
// todo refactor // todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn); return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
#endif
} }
// todo refactor // todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateGet_rocksdb(pState, key, pVal, pVLen);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
} }
// todo refactor // todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen); return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
#endif
} }
// todo refactor // todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateDel_rocksdb(pState, key);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn); return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
} }
int32_t streamStateClear(SStreamState* pState) { int32_t streamStateClear(SStreamState* pState) {
#ifdef USE_ROCKSDB
return streamStateClear_rocksdb(pState);
#else
SWinKey key = {.ts = 0, .groupId = 0}; SWinKey key = {.ts = 0, .groupId = 0};
streamStatePut(pState, &key, NULL, 0); streamStatePut(pState, &key, NULL, 0);
while (1) { while (1) {
...@@ -316,16 +368,24 @@ int32_t streamStateClear(SStreamState* pState) { ...@@ -316,16 +368,24 @@ int32_t streamStateClear(SStreamState* pState) {
} }
} }
return 0; return 0;
#endif
} }
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
// todo refactor // todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillDel_rocksdb(pState, key);
#else
return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn); return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
#endif
} }
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
#else
// todo refactor // todo refactor
int32_t size = *pVLen; int32_t size = *pVLen;
if (streamStateGet(pState, key, pVal, pVLen) == 0) { if (streamStateGet(pState, key, pVal, pVLen) == 0) {
...@@ -334,6 +394,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* ...@@ -334,6 +394,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
*pVal = tdbRealloc(NULL, size); *pVal = tdbRealloc(NULL, size);
memset(*pVal, 0, size); memset(*pVal, 0, size);
return 0; return 0;
#endif
} }
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
...@@ -341,11 +402,18 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV ...@@ -341,11 +402,18 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV
if (!pVal) { if (!pVal) {
return 0; return 0;
} }
#ifdef USE_ROCKSDB
taosMemoryFree(pVal);
#else
streamFreeVal(pVal); streamFreeVal(pVal);
#endif
return 0; return 0;
} }
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGetCur_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
...@@ -359,9 +427,13 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { ...@@ -359,9 +427,13 @@ SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
} }
pCur->number = pState->number; pCur->number = pState->number;
return pCur; return pCur;
#endif
} }
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillGetCur_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL); tdbTbcOpen(pState->pTdbState->pFillStateDb, &pCur->pCur, NULL);
...@@ -373,9 +445,13 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) ...@@ -373,9 +445,13 @@ SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key)
return NULL; return NULL;
} }
return pCur; return pCur;
#endif
} }
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGetAndCheckCur_rocksdb(pState, key);
#else
SStreamStateCur* pCur = streamStateFillGetCur(pState, key); SStreamStateCur* pCur = streamStateFillGetCur(pState, key);
if (pCur) { if (pCur) {
int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0); int32_t code = streamStateGetGroupKVByCur(pCur, key, NULL, 0);
...@@ -385,9 +461,13 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { ...@@ -385,9 +461,13 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
} }
return NULL; return NULL;
#endif
} }
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
...@@ -401,9 +481,13 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** ...@@ -401,9 +481,13 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void**
} }
*pKey = pKTmp->key; *pKey = pKTmp->key;
return 0; return 0;
#endif
} }
int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
...@@ -414,9 +498,13 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo ...@@ -414,9 +498,13 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo
} }
*pKey = *pKTmp; *pKey = *pKTmp;
return 0; return 0;
#endif
} }
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
...@@ -428,9 +516,13 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v ...@@ -428,9 +516,13 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v
} }
} }
return -1; return -1;
#endif
} }
int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGetFirst_rocksdb(pState, key);
#else
// todo refactor // todo refactor
SWinKey tmp = {.ts = 0, .groupId = 0}; SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut(pState, &tmp, NULL, 0); streamStatePut(pState, &tmp, NULL, 0);
...@@ -439,6 +531,7 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { ...@@ -439,6 +531,7 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
streamStateDel(pState, &tmp); streamStateDel(pState, &tmp);
return code; return code;
#endif
} }
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
...@@ -452,6 +545,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { ...@@ -452,6 +545,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
} }
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateSeekKeyNext_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
...@@ -476,9 +572,13 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key ...@@ -476,9 +572,13 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key
} }
return pCur; return pCur;
#endif
} }
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillSeekKeyNext_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) { if (!pCur) {
return NULL; return NULL;
...@@ -501,9 +601,13 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* ...@@ -501,9 +601,13 @@ SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey*
} }
return pCur; return pCur;
#endif
} }
SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillSeekKeyPrev_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
...@@ -526,41 +630,63 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey* ...@@ -526,41 +630,63 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
} }
return pCur; return pCur;
#endif
} }
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
#ifdef USE_ROCKSDB
return streamStateCurNext_rocksdb(pState, pCur);
#else
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
// //
return tdbTbcMoveToNext(pCur->pCur); return tdbTbcMoveToNext(pCur->pCur);
#endif
} }
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
// #ifdef USE_ROCKSDB
return streamStateCurPrev_rocksdb(pState, pCur);
#else
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
return tdbTbcMoveToPrev(pCur->pCur); return tdbTbcMoveToPrev(pCur->pCur);
#endif
} }
void streamStateFreeCur(SStreamStateCur* pCur) { void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) { if (!pCur) {
return; return;
} }
tdbTbcClose(pCur->pCur);
rocksdb_iter_destroy(pCur->iter); rocksdb_iter_destroy(pCur->iter);
tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);
} }
void streamFreeVal(void* val) { tdbFree(val); } void streamFreeVal(void* val) {
#ifdef USE_ROCKSDB
taosMemoryFree(val);
#else
tdbFree(val);
#endif
}
int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateSessionPut_rocksdb(pState, key, value, vLen);
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,
pState->pTdbState->txn); pState->pTdbState->txn);
#endif
} }
int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionGet_rocksdb(pState, key, pVal, pVLen);
#else
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
SSessionKey resKey = *key; SSessionKey resKey = *key;
void* tmp = NULL; void* tmp = NULL;
...@@ -576,14 +702,22 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa ...@@ -576,14 +702,22 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
#endif
} }
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionDel_rocksdb(pState, key);
#else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn); return tdbTbDelete(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->pTdbState->txn);
#endif
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
...@@ -608,9 +742,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons ...@@ -608,9 +742,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons
} }
return pCur; return pCur;
#endif
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyCurrentNext_rocksdb(pState, (SSessionKey*)key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
...@@ -636,9 +774,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons ...@@ -636,9 +774,13 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons
} }
return pCur; return pCur;
#endif
} }
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB
return streamStateSessionSeekKeyNext_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
...@@ -663,9 +805,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess ...@@ -663,9 +805,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
} }
return pCur; return pCur;
#endif
} }
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
#else
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
...@@ -682,9 +828,13 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v ...@@ -682,9 +828,13 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v
} }
*pKey = pKTmp->key; *pKey = pKTmp->key;
return 0; return 0;
#endif
} }
int32_t streamStateSessionClear(SStreamState* pState) { int32_t streamStateSessionClear(SStreamState* pState) {
#ifdef USE_ROCKSDB
return streamStateSessionClear_rocksdb(pState);
#else
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key);
while (1) { while (1) {
...@@ -702,9 +852,13 @@ int32_t streamStateSessionClear(SStreamState* pState) { ...@@ -702,9 +852,13 @@ int32_t streamStateSessionClear(SStreamState* pState) {
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return 0; return 0;
#endif
} }
int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
#ifdef USE_ROCKSDB
return streamStateSessionGetKeyByRange_rocksdb(pState, key, curKey);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
...@@ -750,10 +904,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* ...@@ -750,10 +904,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey*
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return -1; return -1;
#endif
} }
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) { int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen);
#else
// todo refactor // todo refactor
int32_t res = 0; int32_t res = 0;
SSessionKey originKey = *key; SSessionKey originKey = *key;
...@@ -799,11 +957,17 @@ _end: ...@@ -799,11 +957,17 @@ _end:
*pVal = tmp; *pVal = tmp;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
#endif
} }
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
// todo refactor // todo refactor
#ifdef USE_ROCKSDB
return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen);
#else
int32_t res = 0; int32_t res = 0;
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
...@@ -854,27 +1018,48 @@ _end: ...@@ -854,27 +1018,48 @@ _end:
*pVal = tmp; *pVal = tmp;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
#endif
} }
int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
#ifdef USE_ROCKSDB
return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen);
#else
return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn); return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn);
#endif
} }
int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) {
#ifdef USE_ROCKSDB
return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen);
#else
return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen); return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen);
#endif
} }
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
#ifdef USE_ROCKSDB
return streamStatePutParName_rocksdb(pState, groupId, tbname);
#else
return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, return tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN,
pState->pTdbState->txn); pState->pTdbState->txn);
#endif
} }
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
#ifdef USE_ROCKSDB
return streamStateGetParName_rocksdb(pState, groupId, pVal);
#else
int32_t len; int32_t len;
return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); return tdbTbGet(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
#endif
} }
void streamStateDestroy(SStreamState* pState) { void streamStateDestroy(SStreamState* pState) {
#ifdef USE_ROCKSDB
streamStateDestroy_rocksdb(pState);
// do nothong
#endif
taosMemoryFreeClear(pState->pTdbState); taosMemoryFreeClear(pState->pTdbState);
taosMemoryFreeClear(pState); taosMemoryFreeClear(pState);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册