Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d1a746e4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d1a746e4
编写于
5月 16, 2023
作者:
H
Haojun Liao
提交者:
GitHub
5月 16, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21295 from taosdata/enh/rocksdbSstateMerge
Enh/rocksdb sstate merge
上级
de0cc463
941e0054
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
138 addition
and
142 deletion
+138
-142
cmake/cmake.define
cmake/cmake.define
+1
-1
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+3
-3
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-4
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+3
-0
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+1
-11
source/libs/stream/inc/streamInc.h
source/libs/stream/inc/streamInc.h
+4
-1
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+59
-79
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+9
-8
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+17
-18
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+3
-3
source/libs/stream/src/tstreamFileState.c
source/libs/stream/src/tstreamFileState.c
+11
-8
source/libs/stream/test/CMakeLists.txt
source/libs/stream/test/CMakeLists.txt
+1
-1
source/libs/stream/test/tstreamUpdateTest.cpp
source/libs/stream/test/tstreamUpdateTest.cpp
+23
-5
未找到文件。
cmake/cmake.define
浏览文件 @
d1a746e4
cmake_minimum_required(VERSION 3.0)
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE O
N
)
set(CMAKE_VERBOSE_MAKEFILE O
FF
)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
#set output directory
...
...
include/libs/stream/streamState.h
浏览文件 @
d1a746e4
...
@@ -20,13 +20,13 @@
...
@@ -20,13 +20,13 @@
#include "tsimplehash.h"
#include "tsimplehash.h"
#include "tstreamFileState.h"
#include "tstreamFileState.h"
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
#ifndef _STREAM_STATE_H_
#define _STREAM_STATE_H_
// void* streamBackendInit(const char* path);
// void* streamBackendInit(const char* path);
// void streamBackendCleanup(void* arg);
// void streamBackendCleanup(void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
// SListNode* streamBackendAddCompare(void* backend, void* arg);
...
...
include/libs/stream/tstream.h
浏览文件 @
d1a746e4
...
@@ -13,16 +13,13 @@
...
@@ -13,16 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor.h"
#include "os.h"
#include "os.h"
#include "query.h"
#include "streamState.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "tdbInt.h"
#include "tdbInt.h"
#include "tmsg.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tmsgcb.h"
#include "tqueue.h"
#include "tqueue.h"
#include "trpc.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
...
@@ -340,7 +337,7 @@ typedef struct SStreamMeta {
...
@@ -340,7 +337,7 @@ typedef struct SStreamMeta {
TTB
*
pTaskDb
;
TTB
*
pTaskDb
;
TTB
*
pCheckpointDb
;
TTB
*
pCheckpointDb
;
SHashObj
*
pTasks
;
SHashObj
*
pTasks
;
SArray
*
pTaskList
;
// SArray<task_id*>
SArray
*
pTaskList
;
// SArray<task_id*>
void
*
ahandle
;
void
*
ahandle
;
TXN
*
txn
;
TXN
*
txn
;
FTaskExpand
*
expandFunc
;
FTaskExpand
*
expandFunc
;
...
@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
...
@@ -568,6 +565,8 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t
streamProcessRecoverFinishReq
(
SStreamTask
*
pTask
,
int32_t
childId
);
int32_t
streamProcessRecoverFinishReq
(
SStreamTask
*
pTask
,
int32_t
childId
);
void
streamMetaInit
();
void
streamMetaCleanup
();
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
);
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
);
void
streamMetaClose
(
SStreamMeta
*
streamMeta
);
void
streamMetaClose
(
SStreamMeta
*
streamMeta
);
...
...
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
浏览文件 @
d1a746e4
...
@@ -18,6 +18,7 @@
...
@@ -18,6 +18,7 @@
#include "dmNodes.h"
#include "dmNodes.h"
#include "index.h"
#include "index.h"
#include "qworker.h"
#include "qworker.h"
#include "tstream.h"
static
bool
dmRequireNode
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
)
{
static
bool
dmRequireNode
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
)
{
SMgmtInputOpt
input
=
dmBuildMgmtInputOpt
(
pWrapper
);
SMgmtInputOpt
input
=
dmBuildMgmtInputOpt
(
pWrapper
);
...
@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
...
@@ -153,6 +154,7 @@ int32_t dmInitDnode(SDnode *pDnode) {
}
}
indexInit
(
tsNumOfCommitThreads
);
indexInit
(
tsNumOfCommitThreads
);
streamMetaInit
();
dmReportStartup
(
"dnode-transport"
,
"initialized"
);
dmReportStartup
(
"dnode-transport"
,
"initialized"
);
dDebug
(
"dnode is created, ptr:%p"
,
pDnode
);
dDebug
(
"dnode is created, ptr:%p"
,
pDnode
);
...
@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
...
@@ -175,6 +177,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupServer
(
pDnode
);
dmCleanupServer
(
pDnode
);
dmClearVars
(
pDnode
);
dmClearVars
(
pDnode
);
rpcCleanup
();
rpcCleanup
();
streamMetaCleanup
();
indexCleanup
();
indexCleanup
();
taosConvDestroy
();
taosConvDestroy
();
dDebug
(
"dnode is closed, ptr:%p"
,
pDnode
);
dDebug
(
"dnode is closed, ptr:%p"
,
pDnode
);
...
...
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
d1a746e4
...
@@ -16,8 +16,6 @@
...
@@ -16,8 +16,6 @@
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_
#include "executor.h"
#include "rocksdb/c.h"
#include "rocksdb/c.h"
// #include "streamInc.h"
// #include "streamInc.h"
#include "streamState.h"
#include "streamState.h"
...
@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
...
@@ -112,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
,
bool
remove
);
void
streamStateDestroy_rocksdb
(
SStreamState
*
pState
,
bool
remove
);
void
*
streamStateCreateBatch
();
int32_t
streamStateGetBatchSize
(
void
*
pBatch
);
void
streamStateClearBatch
(
void
*
pBatch
);
void
streamStateDestroyBatch
(
void
*
pBatch
);
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
);
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
);
// default cf
// default cf
int32_t
streamDefaultPut_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
*
pVal
,
int32_t
pVLen
);
int32_t
streamDefaultPut_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
*
pVal
,
int32_t
pVLen
);
int32_t
streamDefaultGet_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamDefaultGet_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
...
@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
...
@@ -138,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void
streamStateClearBatch
(
void
*
pBatch
);
void
streamStateClearBatch
(
void
*
pBatch
);
void
streamStateDestroyBatch
(
void
*
pBatch
);
void
streamStateDestroyBatch
(
void
*
pBatch
);
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
);
void
*
val
,
int32_t
vlen
,
int64_t
ttl
);
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
);
int32_t
streamStatePutBatch_rocksdb
(
SStreamState
*
pState
,
void
*
pBatch
);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif
#endif
\ No newline at end of file
source/libs/stream/inc/streamInc.h
浏览文件 @
d1a746e4
...
@@ -16,9 +16,12 @@
...
@@ -16,9 +16,12 @@
#ifndef _STREAM_INC_H_
#ifndef _STREAM_INC_H_
#define _STREAM_INC_H_
#define _STREAM_INC_H_
//#include "executor.h"
#include "executor.h"
#include "query.h"
#include "tstream.h"
#include "tstream.h"
#include "trpc.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
...
...
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
d1a746e4
...
@@ -13,8 +13,9 @@
...
@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
// #include "streamStateRocksdb.h"
#include "streamBackendRocksdb.h"
#include "streamBackendRocksdb.h"
#include "executor.h"
#include "query.h"
#include "tcommon.h"
#include "tcommon.h"
typedef
struct
SCompactFilteFactory
{
typedef
struct
SCompactFilteFactory
{
...
@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
...
@@ -110,6 +111,9 @@ void* streamBackendInit(const char* path) {
taosMemoryFreeClear
(
err
);
taosMemoryFreeClear
(
err
);
}
}
}
else
{
}
else
{
/*
list all cf and get prefix
*/
int64_t
streamId
;
int64_t
streamId
;
int32_t
taskId
,
dummpy
=
0
;
int32_t
taskId
,
dummpy
=
0
;
SHashObj
*
tbl
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
SHashObj
*
tbl
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
...
@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
...
@@ -649,18 +653,7 @@ const char* compactFilteFactoryName(void* arg) {
void
destroyCompactFilte
(
void
*
arg
)
{
(
void
)
arg
;
}
void
destroyCompactFilte
(
void
*
arg
)
{
(
void
)
arg
;
}
unsigned
char
compactFilte
(
void
*
arg
,
int
level
,
const
char
*
key
,
size_t
klen
,
const
char
*
val
,
size_t
vlen
,
unsigned
char
compactFilte
(
void
*
arg
,
int
level
,
const
char
*
key
,
size_t
klen
,
const
char
*
val
,
size_t
vlen
,
char
**
newval
,
size_t
*
newvlen
,
unsigned
char
*
value_changed
)
{
char
**
newval
,
size_t
*
newvlen
,
unsigned
char
*
value_changed
)
{
// int64_t unixTime = taosGetTimestampMs();
return
streamStateValueIsStale
((
char
*
)
val
)
?
1
:
0
;
if
(
streamStateValueIsStale
((
char
*
)
val
))
{
return
1
;
}
// SStreamValue value;
// memset(&value, 0, sizeof(value));
// streamValueDecode(&value, (char*)val);
// taosMemoryFree(value.data);
// if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
// return 1;
// }
return
0
;
}
}
const
char
*
compactFilteName
(
void
*
arg
)
{
return
"stream_filte"
;
}
const
char
*
compactFilteName
(
void
*
arg
)
{
return
"stream_filte"
;
}
...
@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
...
@@ -703,7 +696,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy
(
cfNames
[
0
],
"default"
,
strlen
(
"default"
));
memcpy
(
cfNames
[
0
],
"default"
,
strlen
(
"default"
));
continue
;
continue
;
}
}
qError
(
"cf name %s"
,
idstr
);
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[(
i
-
1
)
%
(
cfLen
)].
key
);
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[(
i
-
1
)
%
(
cfLen
)].
key
);
if
(
i
%
cfLen
==
0
)
{
if
(
i
%
cfLen
==
0
)
{
...
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
...
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
}
}
}
}
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
qError
(
"cf name %s"
,
cfNames
[
i
]);
}
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_options_t
*
));
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
RocksdbCfParam
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
RocksdbCfParam
*
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
...
@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
...
@@ -858,7 +847,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
qError
(
"failed to create cf:%s_%s, reason:%s"
,
pState
->
pTdbState
->
idstr
,
ginitDict
[
i
].
key
,
err
);
taosMemoryFreeClear
(
err
);
taosMemoryFreeClear
(
err
);
// return -1;
}
}
}
}
pState
->
pTdbState
->
rocksdb
=
handle
->
db
;
pState
->
pTdbState
->
rocksdb
=
handle
->
db
;
...
@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -1012,53 +1000,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
taosMemoryFree(ttlV); \
} while (0);
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
do { \
code = 0; \
code = 0; \
char buf[128] = {0}; \
char buf[128] = {0}; \
char* err = NULL; \
char* err = NULL; \
int i = streamGetInit(funcname); \
int i = streamGetInit(funcname); \
if (i < 0) { \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
code = -1; \
break; \
break; \
} \
} \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err == NULL) { \
if (err != NULL) taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
code = -1; \
funcname); \
} else { \
} else { \
char * p = NULL, *end = NULL; \
qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
err); \
if (len < 0) { \
taosMemoryFreeClear(err); \
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
} \
code = -1; \
code = -1; \
} else { \
} else { \
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
char* p = NULL; \
} \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (pVal != NULL) { \
if (len < 0) { \
*pVal = p; \
qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
} else { \
funcname); \
taosMemoryFree(p); \
code = -1; \
} \
} else { \
taosMemoryFree(val); \
qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
if (vLen != NULL) *vLen = len; \
len); \
} \
} \
if (err != NULL) { \
taosMemoryFree(val); \
taosMemoryFree(err); \
if (vLen != NULL) *vLen = len; \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
} \
code = -1; \
if (code == 0) \
} else { \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
} while (0);
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
...
@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
...
@@ -1133,10 +1119,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
// eLen);
if
(
err
!=
NULL
)
{
if
(
err
!=
NULL
)
{
qWarn
(
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
"failed to delete range cf(state) err: %s, "
"start: %s, end:%s"
,
err
,
toStringStart
,
toStringEnd
);
taosMemoryFree
(
err
);
taosMemoryFree
(
err
);
}
}
...
@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
...
@@ -1588,20 +1571,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
)
||
iterValueIsStale
(
pCur
->
iter
))
{
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
)
||
iterValueIsStale
(
pCur
->
iter
))
{
return
-
1
;
return
-
1
;
}
}
size_t
t
len
;
size_t
klen
,
v
len
;
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
t
len
);
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
k
len
);
winKeyDecode
(
&
winKey
,
keyStr
);
winKeyDecode
(
&
winKey
,
keyStr
);
size_t
vlen
=
0
;
const
char
*
valStr
=
rocksdb_iter_value
(
pCur
->
iter
,
&
vlen
);
const
char
*
valStr
=
rocksdb_iter_value
(
pCur
->
iter
,
&
vlen
);
char
*
dst
=
NULL
;
//
char* dst = NULL;
int32_t
len
=
decodeValueFunc
((
void
*
)
valStr
,
vlen
,
NULL
,
&
dst
);
int32_t
len
=
decodeValueFunc
((
void
*
)
valStr
,
vlen
,
NULL
,
(
char
**
)
pVal
);
if
(
len
<
0
)
{
if
(
len
<
0
)
{
return
-
1
;
return
-
1
;
}
}
if
(
pVLen
!=
NULL
)
*
pVLen
=
len
;
if
(
pVal
!=
NULL
)
*
pVal
=
(
char
*
)
dst
;
if
(
pVLen
!=
NULL
)
*
pVLen
=
vlen
;
*
pKey
=
winKey
;
*
pKey
=
winKey
;
return
0
;
return
0
;
...
@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
...
@@ -1999,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void
streamStateClearBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_clear
((
rocksdb_writebatch_t
*
)
pBatch
);
}
void
streamStateClearBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_clear
((
rocksdb_writebatch_t
*
)
pBatch
);
}
void
streamStateDestroyBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_destroy
((
rocksdb_writebatch_t
*
)
pBatch
);
}
void
streamStateDestroyBatch
(
void
*
pBatch
)
{
rocksdb_writebatch_destroy
((
rocksdb_writebatch_t
*
)
pBatch
);
}
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
int32_t
streamStatePutBatch
(
SStreamState
*
pState
,
const
char
*
cfName
,
rocksdb_writebatch_t
*
pBatch
,
void
*
key
,
void
*
val
,
int32_t
vlen
)
{
void
*
val
,
int32_t
vlen
,
int64_t
ttl
)
{
int
i
=
streamGetInit
(
cfName
);
int
i
=
streamGetInit
(
cfName
);
if
(
i
<
0
)
{
if
(
i
<
0
)
{
...
@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
...
@@ -2010,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t
klen
=
ginitDict
[
i
].
enFunc
((
void
*
)
key
,
buf
);
int32_t
klen
=
ginitDict
[
i
].
enFunc
((
void
*
)
key
,
buf
);
char
*
ttlV
=
NULL
;
char
*
ttlV
=
NULL
;
int32_t
ttlVLen
=
ginitDict
[
i
].
enValueFunc
(
val
,
vlen
,
0
,
&
ttlV
);
int32_t
ttlVLen
=
ginitDict
[
i
].
enValueFunc
(
val
,
vlen
,
ttl
,
&
ttlV
);
rocksdb_column_family_handle_t
*
pCf
=
pState
->
pTdbState
->
pHandle
[
ginitDict
[
i
].
idx
];
rocksdb_column_family_handle_t
*
pCf
=
pState
->
pTdbState
->
pHandle
[
ginitDict
[
i
].
idx
];
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
rocksdb_writebatch_put_cf
((
rocksdb_writebatch_t
*
)
pBatch
,
pCf
,
buf
,
(
size_t
)
klen
,
ttlV
,
(
size_t
)
ttlVLen
);
taosMemoryFree
(
ttlV
);
taosMemoryFree
(
ttlV
);
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
d1a746e4
...
@@ -20,12 +20,12 @@
...
@@ -20,12 +20,12 @@
#define MIN_STREAM_EXEC_BATCH_NUM 16
#define MIN_STREAM_EXEC_BATCH_NUM 16
bool
streamTaskShouldStop
(
const
SStreamStatus
*
pStatus
)
{
bool
streamTaskShouldStop
(
const
SStreamStatus
*
pStatus
)
{
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
return
(
status
==
TASK_STATUS__STOP
)
||
(
status
==
TASK_STATUS__DROPPING
);
return
(
status
==
TASK_STATUS__STOP
)
||
(
status
==
TASK_STATUS__DROPPING
);
}
}
bool
streamTaskShouldPause
(
const
SStreamStatus
*
pStatus
)
{
bool
streamTaskShouldPause
(
const
SStreamStatus
*
pStatus
)
{
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
int32_t
status
=
atomic_load_8
((
int8_t
*
)
&
pStatus
->
taskStatus
);
return
(
status
==
TASK_STATUS__PAUSE
);
return
(
status
==
TASK_STATUS__PAUSE
);
}
}
...
@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
...
@@ -53,14 +53,14 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
);
ASSERT
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
);
const
SStreamDataSubmit2
*
pSubmit
=
(
const
SStreamDataSubmit2
*
)
data
;
const
SStreamDataSubmit2
*
pSubmit
=
(
const
SStreamDataSubmit2
*
)
data
;
qSetMultiStreamInput
(
pExecutor
,
&
pSubmit
->
submit
,
1
,
STREAM_INPUT__DATA_SUBMIT
);
qSetMultiStreamInput
(
pExecutor
,
&
pSubmit
->
submit
,
1
,
STREAM_INPUT__DATA_SUBMIT
);
qDebug
(
"s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%"
PRId64
,
pTask
->
id
.
idStr
,
pSubmit
,
pSubmit
->
submit
.
msgStr
,
qDebug
(
"s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%"
PRId64
,
pTask
->
id
.
idStr
,
pSubmit
,
pSubmit
->
submit
.
msgLen
,
pSubmit
->
submit
.
ver
);
pSubmit
->
submit
.
msg
Str
,
pSubmit
->
submit
.
msg
Len
,
pSubmit
->
submit
.
ver
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
const
SStreamDataBlock
*
pBlock
=
(
const
SStreamDataBlock
*
)
data
;
const
SStreamDataBlock
*
pBlock
=
(
const
SStreamDataBlock
*
)
data
;
SArray
*
pBlockList
=
pBlock
->
blocks
;
SArray
*
pBlockList
=
pBlock
->
blocks
;
int32_t
numOfBlocks
=
taosArrayGetSize
(
pBlockList
);
int32_t
numOfBlocks
=
taosArrayGetSize
(
pBlockList
);
qDebug
(
"s-task:%s set sdata blocks as input num:%d, ver:%"
PRId64
,
pTask
->
id
.
idStr
,
numOfBlocks
,
pBlock
->
sourceVer
);
qDebug
(
"s-task:%s set sdata blocks as input num:%d, ver:%"
PRId64
,
pTask
->
id
.
idStr
,
numOfBlocks
,
pBlock
->
sourceVer
);
qSetMultiStreamInput
(
pExecutor
,
pBlockList
->
pData
,
numOfBlocks
,
STREAM_INPUT__DATA_BLOCK
);
qSetMultiStreamInput
(
pExecutor
,
pBlockList
->
pData
,
numOfBlocks
,
STREAM_INPUT__DATA_BLOCK
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
}
else
if
(
pItem
->
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
const
SStreamMergedSubmit2
*
pMerged
=
(
const
SStreamMergedSubmit2
*
)
data
;
const
SStreamMergedSubmit2
*
pMerged
=
(
const
SStreamMergedSubmit2
*
)
data
;
...
@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
...
@@ -202,7 +202,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
qRes
->
blocks
=
pRes
;
qRes
->
blocks
=
pRes
;
code
=
streamTaskOutput
(
pTask
,
qRes
);
code
=
streamTaskOutput
(
pTask
,
qRes
);
if
(
code
==
TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
)
{
if
(
code
==
TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY
)
{
taosFreeQitem
(
pRes
);
taosArrayDestroyEx
(
pRes
,
(
FDelete
)
blockDataFreeRes
);
taosFreeQitem
(
qRes
);
return
code
;
return
code
;
}
}
...
@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
...
@@ -332,12 +333,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int64_t
ckId
=
0
;
int64_t
ckId
=
0
;
int64_t
dataVer
=
0
;
int64_t
dataVer
=
0
;
qGetCheckpointVersion
(
pTask
->
exec
.
pExecutor
,
&
dataVer
,
&
ckId
);
qGetCheckpointVersion
(
pTask
->
exec
.
pExecutor
,
&
dataVer
,
&
ckId
);
if
(
ckId
>
pTask
->
chkInfo
.
id
)
{
// save it since the checkpoint is updated
if
(
ckId
>
pTask
->
chkInfo
.
id
)
{
// save it since the checkpoint is updated
qDebug
(
"s-task:%s exec end, start to update check point, ver from %"
PRId64
" to %"
PRId64
qDebug
(
"s-task:%s exec end, start to update check point, ver from %"
PRId64
" to %"
PRId64
", checkPoint id:%"
PRId64
" -> %"
PRId64
,
", checkPoint id:%"
PRId64
" -> %"
PRId64
,
pTask
->
id
.
idStr
,
pTask
->
chkInfo
.
version
,
dataVer
,
pTask
->
chkInfo
.
id
,
ckId
);
pTask
->
id
.
idStr
,
pTask
->
chkInfo
.
version
,
dataVer
,
pTask
->
chkInfo
.
id
,
ckId
);
pTask
->
chkInfo
=
(
SCheckpointInfo
)
{.
version
=
dataVer
,
.
id
=
ckId
,
.
currentVer
=
pTask
->
chkInfo
.
currentVer
};
pTask
->
chkInfo
=
(
SCheckpointInfo
){.
version
=
dataVer
,
.
id
=
ckId
,
.
currentVer
=
pTask
->
chkInfo
.
currentVer
};
taosWLockLatch
(
&
pTask
->
pMeta
->
lock
);
taosWLockLatch
(
&
pTask
->
pMeta
->
lock
);
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
d1a746e4
...
@@ -19,6 +19,13 @@
...
@@ -19,6 +19,13 @@
#include "tref.h"
#include "tref.h"
#include "ttimer.h"
#include "ttimer.h"
static
TdThreadOnce
streamMetaModuleInit
=
PTHREAD_ONCE_INIT
;
static
int32_t
streamBackendId
=
0
;
static
void
streamMetaEnvInit
()
{
streamBackendId
=
taosOpenRef
(
20
,
streamBackendCleanup
);
}
void
streamMetaInit
()
{
taosThreadOnce
(
&
streamMetaModuleInit
,
streamMetaEnvInit
);
}
void
streamMetaCleanup
()
{
taosCloseRef
(
streamBackendId
);
}
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
)
{
SStreamMeta
*
streamMetaOpen
(
const
char
*
path
,
void
*
ahandle
,
FTaskExpand
expandFunc
,
int32_t
vgId
)
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
SStreamMeta
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamMeta
));
SStreamMeta
*
pMeta
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamMeta
));
...
@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
...
@@ -32,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf
(
streamPath
,
"%s/%s"
,
path
,
"stream"
);
sprintf
(
streamPath
,
"%s/%s"
,
path
,
"stream"
);
pMeta
->
path
=
taosStrdup
(
streamPath
);
pMeta
->
path
=
taosStrdup
(
streamPath
);
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
,
0
)
<
0
)
{
if
(
tdbOpen
(
pMeta
->
path
,
16
*
1024
,
1
,
&
pMeta
->
db
,
0
)
<
0
)
{
taosMemoryFree
(
streamPath
);
goto
_err
;
goto
_err
;
}
}
memset
(
streamPath
,
0
,
len
);
sprintf
(
streamPath
,
"%s/%s"
,
pMeta
->
path
,
"checkpoints"
);
sprintf
(
streamPath
,
"%s/%s"
,
pMeta
->
path
,
"checkpoints"
);
code
=
taosMulModeMkDir
(
streamPath
,
0755
);
code
=
taosMulModeMkDir
(
streamPath
,
0755
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
taosMemoryFree
(
streamPath
);
goto
_err
;
goto
_err
;
}
}
taosMemoryFree
(
streamPath
);
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
,
0
)
<
0
)
{
if
(
tdbTbOpen
(
"task.db"
,
sizeof
(
int32_t
),
-
1
,
NULL
,
pMeta
->
db
,
&
pMeta
->
pTaskDb
,
0
)
<
0
)
{
goto
_err
;
goto
_err
;
...
@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
...
@@ -74,26 +79,26 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta
->
vgId
=
vgId
;
pMeta
->
vgId
=
vgId
;
pMeta
->
ahandle
=
ahandle
;
pMeta
->
ahandle
=
ahandle
;
pMeta
->
expandFunc
=
expandFunc
;
pMeta
->
expandFunc
=
expandFunc
;
pMeta
->
streamBackendId
=
streamBackendId
;
char
*
statePath
=
taosMemoryCalloc
(
1
,
len
);
memset
(
streamPath
,
0
,
len
);
sprintf
(
st
ate
Path
,
"%s/%s"
,
pMeta
->
path
,
"state"
);
sprintf
(
st
ream
Path
,
"%s/%s"
,
pMeta
->
path
,
"state"
);
code
=
taosMulModeMkDir
(
st
ate
Path
,
0755
);
code
=
taosMulModeMkDir
(
st
ream
Path
,
0755
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
taosMemoryFree
(
streamPath
);
goto
_err
;
goto
_err
;
}
}
pMeta
->
streamBackend
=
streamBackendInit
(
statePath
);
pMeta
->
streamBackend
=
streamBackendInit
(
streamPath
);
pMeta
->
streamBackendId
=
taosOpenRef
(
20
,
streamBackendCleanup
);
pMeta
->
streamBackendRid
=
taosAddRef
(
streamBackendId
,
pMeta
->
streamBackend
);
pMeta
->
streamBackendRid
=
taosAddRef
(
pMeta
->
streamBackendId
,
pMeta
->
streamBackend
);
taosMemoryFree
(
st
ate
Path
);
taosMemoryFree
(
st
ream
Path
);
taosInitRWLatch
(
&
pMeta
->
lock
);
taosInitRWLatch
(
&
pMeta
->
lock
);
return
pMeta
;
return
pMeta
;
_err:
_err:
taosMemoryFree
(
streamPath
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
->
path
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pTasks
)
taosHashCleanup
(
pMeta
->
pTasks
);
if
(
pMeta
->
pTaskList
)
taosArrayDestroy
(
pMeta
->
pTaskList
);
if
(
pMeta
->
pTaskList
)
taosArrayDestroy
(
pMeta
->
pTaskList
);
...
@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
...
@@ -129,9 +134,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
}
}
taosHashCleanup
(
pMeta
->
pTasks
);
taosHashCleanup
(
pMeta
->
pTasks
);
taosRemoveRef
(
pMeta
->
streamBackendId
,
pMeta
->
streamBackendRid
);
taosRemoveRef
(
streamBackendId
,
pMeta
->
streamBackendRid
);
// streamBackendCleanup(pMeta->streamBackend);
taosCloseRef
(
pMeta
->
streamBackendId
);
pMeta
->
pTaskList
=
taosArrayDestroy
(
pMeta
->
pTaskList
);
pMeta
->
pTaskList
=
taosArrayDestroy
(
pMeta
->
pTaskList
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
->
path
);
taosMemoryFree
(
pMeta
);
taosMemoryFree
(
pMeta
);
...
@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
...
@@ -265,13 +268,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
**
ppTask
=
(
SStreamTask
**
)
taosHashGet
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
if
(
ppTask
)
{
if
(
ppTask
)
{
SStreamTask
*
pTask
=
*
ppTask
;
SStreamTask
*
pTask
=
*
ppTask
;
// taosWLockLatch(&pMeta->lock);
taosHashRemove
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
taosHashRemove
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
pMeta
->
txn
);
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
pMeta
->
txn
);
//
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__DROPPING
);
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__DROPPING
);
int32_t
num
=
taosArrayGetSize
(
pMeta
->
pTaskList
);
int32_t
num
=
taosArrayGetSize
(
pMeta
->
pTaskList
);
...
...
source/libs/stream/src/streamState.c
浏览文件 @
d1a746e4
...
@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
...
@@ -115,7 +115,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState
->
taskId
=
pTask
->
id
.
taskId
;
pState
->
taskId
=
pTask
->
id
.
taskId
;
pState
->
streamId
=
pTask
->
id
.
streamId
;
pState
->
streamId
=
pTask
->
id
.
streamId
;
#ifdef USE_ROCKSDB
#ifdef USE_ROCKSDB
qWarn
(
"open stream state1"
);
//
qWarn("open stream state1");
taosAcquireRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
taosAcquireRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
int
code
=
streamStateOpenBackend
(
pTask
->
pMeta
->
streamBackend
,
pState
);
int
code
=
streamStateOpenBackend
(
pTask
->
pMeta
->
streamBackend
,
pState
);
if
(
code
==
-
1
)
{
if
(
code
==
-
1
)
{
...
@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
...
@@ -220,6 +220,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB
#ifdef USE_ROCKSDB
// streamStateCloseBackend(pState);
// streamStateCloseBackend(pState);
streamStateDestroy
(
pState
,
remove
);
streamStateDestroy
(
pState
,
remove
);
taosReleaseRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
#else
#else
tdbCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
tdbCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
tdbPostCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
tdbPostCommit
(
pState
->
pTdbState
->
db
,
pState
->
pTdbState
->
txn
);
...
@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
...
@@ -231,7 +232,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbTbClose
(
pState
->
pTdbState
->
pParTagDb
);
tdbClose
(
pState
->
pTdbState
->
db
);
tdbClose
(
pState
->
pTdbState
->
db
);
#endif
#endif
taosReleaseRef
(
pTask
->
pMeta
->
streamBackendId
,
pTask
->
pMeta
->
streamBackendRid
);
}
}
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
int32_t
streamStateBegin
(
SStreamState
*
pState
)
{
...
@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
...
@@ -399,7 +399,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t
code
=
0
;
int32_t
code
=
0
;
void
*
batch
=
streamStateCreateBatch
();
void
*
batch
=
streamStateCreateBatch
();
code
=
streamStatePutBatch
(
pState
,
"default"
,
batch
,
pKey
,
pVal
,
vLen
);
code
=
streamStatePutBatch
(
pState
,
"default"
,
batch
,
pKey
,
pVal
,
vLen
,
0
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
code
;
return
code
;
}
}
...
...
source/libs/stream/src/tstreamFileState.c
浏览文件 @
d1a746e4
...
@@ -15,6 +15,7 @@
...
@@ -15,6 +15,7 @@
#include "tstreamFileState.h"
#include "tstreamFileState.h"
#include "query.h"
#include "streamBackendRocksdb.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "taos.h"
#include "tcommon.h"
#include "tcommon.h"
...
@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
...
@@ -154,9 +155,7 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff
(
pFileState
,
0
,
true
);
clearExpiredRowBuff
(
pFileState
,
0
,
true
);
}
}
bool
needClearDiskBuff
(
SStreamFileState
*
pFileState
)
{
bool
needClearDiskBuff
(
SStreamFileState
*
pFileState
)
{
return
pFileState
->
flushMark
>
0
;
}
return
pFileState
->
flushMark
>
0
;
}
void
popUsedBuffs
(
SStreamFileState
*
pFileState
,
SStreamSnapshot
*
pFlushList
,
uint64_t
max
,
bool
used
)
{
void
popUsedBuffs
(
SStreamFileState
*
pFileState
,
SStreamSnapshot
*
pFlushList
,
uint64_t
max
,
bool
used
)
{
uint64_t
i
=
0
;
uint64_t
i
=
0
;
...
@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
...
@@ -325,7 +324,9 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
void
releaseRowBuffPos
(
SRowBuffPos
*
pBuff
)
{
pBuff
->
beUsed
=
false
;
}
void
releaseRowBuffPos
(
SRowBuffPos
*
pBuff
)
{
pBuff
->
beUsed
=
false
;
}
SStreamSnapshot
*
getSnapshot
(
SStreamFileState
*
pFileState
)
{
SStreamSnapshot
*
getSnapshot
(
SStreamFileState
*
pFileState
)
{
clearExpiredRowBuff
(
pFileState
,
pFileState
->
maxTs
-
pFileState
->
deleteMark
,
false
);
int64_t
mark
=
(
INT64_MIN
+
pFileState
->
deleteMark
>=
pFileState
->
maxTs
)
?
INT64_MIN
:
pFileState
->
maxTs
-
pFileState
->
deleteMark
;
clearExpiredRowBuff
(
pFileState
,
mark
,
false
);
return
pFileState
->
usedBuffs
;
return
pFileState
->
usedBuffs
;
}
}
...
@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
...
@@ -356,7 +357,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
}
}
SStateKey
sKey
=
{.
key
=
*
((
SWinKey
*
)
pPos
->
pKey
),
.
opNum
=
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
number
};
SStateKey
sKey
=
{.
key
=
*
((
SWinKey
*
)
pPos
->
pKey
),
.
opNum
=
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
number
};
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"state"
,
batch
,
&
sKey
,
pPos
->
pRowBuff
,
pFileState
->
rowSize
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"state"
,
batch
,
&
sKey
,
pPos
->
pRowBuff
,
pFileState
->
rowSize
,
0
);
qDebug
(
"===stream===put %"
PRId64
" to disc, res %d"
,
sKey
.
key
.
ts
,
code
);
qDebug
(
"===stream===put %"
PRId64
" to disc, res %d"
,
sKey
.
key
.
ts
,
code
);
}
}
if
(
streamStateGetBatchSize
(
batch
)
>
0
)
{
if
(
streamStateGetBatchSize
(
batch
)
>
0
)
{
...
@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
...
@@ -372,7 +373,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t
len
=
0
;
int32_t
len
=
0
;
sprintf
(
keyBuf
,
"%s:%"
PRId64
""
,
taskKey
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
sprintf
(
keyBuf
,
"%s:%"
PRId64
""
,
taskKey
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
streamFileStateEncode
(
&
pFileState
->
flushMark
,
&
valBuf
,
&
len
);
streamFileStateEncode
(
&
pFileState
->
flushMark
,
&
valBuf
,
&
len
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
,
0
);
taosMemoryFree
(
valBuf
);
taosMemoryFree
(
valBuf
);
}
}
{
{
...
@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
...
@@ -381,7 +382,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t
len
=
0
;
int32_t
len
=
0
;
memcpy
(
keyBuf
,
taskKey
,
strlen
(
taskKey
));
memcpy
(
keyBuf
,
taskKey
,
strlen
(
taskKey
));
len
=
sprintf
(
valBuf
,
"%"
PRId64
""
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
len
=
sprintf
(
valBuf
,
"%"
PRId64
""
,
((
SStreamState
*
)
pFileState
->
pFileStore
)
->
checkPointId
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
);
code
=
streamStatePutBatch
(
pFileState
->
pFileStore
,
"default"
,
batch
,
keyBuf
,
valBuf
,
len
,
0
);
}
}
streamStatePutBatch_rocksdb
(
pFileState
->
pFileStore
,
batch
);
streamStatePutBatch_rocksdb
(
pFileState
->
pFileStore
,
batch
);
}
}
...
@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
...
@@ -440,7 +441,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t
recoverSnapshot
(
SStreamFileState
*
pFileState
)
{
int32_t
recoverSnapshot
(
SStreamFileState
*
pFileState
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
deleteExpiredCheckPoint
(
pFileState
,
pFileState
->
maxTs
-
pFileState
->
deleteMark
);
int64_t
mark
=
(
INT64_MIN
+
pFileState
->
deleteMark
>=
pFileState
->
maxTs
)
?
INT64_MIN
:
pFileState
->
maxTs
-
pFileState
->
deleteMark
;
deleteExpiredCheckPoint
(
pFileState
,
mark
);
void
*
pStVal
=
NULL
;
void
*
pStVal
=
NULL
;
int32_t
len
=
0
;
int32_t
len
=
0
;
...
...
source/libs/stream/test/CMakeLists.txt
浏览文件 @
d1a746e4
...
@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
...
@@ -10,7 +10,7 @@ ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
TARGET_LINK_LIBRARIES
(
TARGET_LINK_LIBRARIES
(
streamUpdateTest
streamUpdateTest
PUBLIC os util common gtest stream
PUBLIC os util common gtest
gtest_main
stream
)
)
TARGET_INCLUDE_DIRECTORIES
(
TARGET_INCLUDE_DIRECTORIES
(
...
...
source/libs/stream/test/tstreamUpdateTest.cpp
浏览文件 @
d1a746e4
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include "streamBackendRocksdb.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "tstreamUpdate.h"
#include "ttime.h"
#include "ttime.h"
using
namespace
std
;
using
namespace
std
;
#define MAX_NUM_SCALABLE_BF 100000
#define MAX_NUM_SCALABLE_BF 100000
class
StreamStateEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
streamMetaInit
();
backend
=
streamBackendInit
(
path
);
}
virtual
void
TearDown
()
{
streamMetaCleanup
();
// indexClose(index);
}
const
char
*
path
=
TD_TMP_DIR_PATH
"stream"
;
void
*
backend
;
};
bool
equalSBF
(
SScalableBf
*
left
,
SScalableBf
*
right
)
{
bool
equalSBF
(
SScalableBf
*
left
,
SScalableBf
*
right
)
{
if
(
left
->
growth
!=
right
->
growth
)
return
false
;
if
(
left
->
growth
!=
right
->
growth
)
return
false
;
if
(
left
->
numBits
!=
right
->
numBits
)
return
false
;
if
(
left
->
numBits
!=
right
->
numBits
)
return
false
;
...
@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
...
@@ -191,8 +208,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
// updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU6);
// updateInfoDestroy(pSU7);
// updateInfoDestroy(pSU7);
}
}
// TEST()
int
main
(
int
argc
,
char
*
argv
[])
{
TEST
(
StreamStateEnv
,
test1
)
{}
testing
::
InitGoogleTest
(
&
argc
,
argv
);
// int main(int argc, char *argv[]) {
return
RUN_ALL_TESTS
();
// testing::InitGoogleTest(&argc, argv);
}
// return RUN_ALL_TESTS();
\ No newline at end of file
// }
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录