Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b4bd6a4f
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b4bd6a4f
编写于
5月 10, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor code
上级
8356533e
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
52 addition
and
71 deletion
+52
-71
source/libs/stream/src/streamBackendRocksdb.c
source/libs/stream/src/streamBackendRocksdb.c
+52
-71
未找到文件。
source/libs/stream/src/streamBackendRocksdb.c
浏览文件 @
b4bd6a4f
...
...
@@ -649,17 +649,9 @@ const char* compactFilteFactoryName(void* arg) {
void
destroyCompactFilte
(
void
*
arg
)
{
(
void
)
arg
;
}
unsigned
char
compactFilte
(
void
*
arg
,
int
level
,
const
char
*
key
,
size_t
klen
,
const
char
*
val
,
size_t
vlen
,
char
**
newval
,
size_t
*
newvlen
,
unsigned
char
*
value_changed
)
{
// int64_t unixTime = taosGetTimestampMs();
if
(
streamStateValueIsStale
((
char
*
)
val
))
{
return
1
;
}
// SStreamValue value;
// memset(&value, 0, sizeof(value));
// streamValueDecode(&value, (char*)val);
// taosMemoryFree(value.data);
// if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) {
// return 1;
// }
return
0
;
}
const
char
*
compactFilteName
(
void
*
arg
)
{
return
"stream_filte"
;
}
...
...
@@ -703,7 +695,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
memcpy
(
cfNames
[
0
],
"default"
,
strlen
(
"default"
));
continue
;
}
q
Error
(
"cf name %s"
,
idstr
);
q
Debug
(
"cf name %s"
,
idstr
);
GEN_COLUMN_FAMILY_NAME
(
cfNames
[
i
],
idstr
,
ginitDict
[(
i
-
1
)
%
(
cfLen
)].
key
);
if
(
i
%
cfLen
==
0
)
{
...
...
@@ -711,9 +703,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
if
(
pIter
!=
NULL
)
idstr
=
taosHashGetKey
(
pIter
,
&
keyLen
);
}
}
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
qError
(
"cf name %s"
,
cfNames
[
i
]);
}
rocksdb_options_t
**
cfOpts
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
rocksdb_options_t
*
));
RocksdbCfParam
*
params
=
taosMemoryCalloc
(
nSize
*
cfLen
+
1
,
sizeof
(
RocksdbCfParam
*
));
for
(
int
i
=
0
;
i
<
nSize
*
cfLen
+
1
;
i
++
)
{
...
...
@@ -1012,53 +1001,51 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
taosMemoryFree(ttlV); \
} while (0);
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \
if (err != NULL) taosMemoryFree(err); \
code = -1; \
} else { \
char * p = NULL, *end = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \
if (len < 0) { \
qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \
} \
if (pVal != NULL) { \
*pVal = p; \
} else { \
taosMemoryFree(p); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = len; \
} \
if (err != NULL) { \
taosMemoryFree(err); \
qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \
} else { \
if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
break; \
} \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
if (err == NULL) { \
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
funcname); \
} else { \
qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
err); \
taosMemoryFreeClear(err); \
} \
code = -1; \
} else { \
char* p = NULL; \
int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (len < 0) { \
qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \
funcname); \
code = -1; \
} else { \
qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \
len); \
} \
taosMemoryFree(val); \
if (vLen != NULL) *vLen = len; \
} \
if (code == 0) \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
...
...
@@ -1133,10 +1120,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if
(
err
!=
NULL
)
{
qWarn
(
"failed to delete range cf(state) err: %s, "
"start: %s, end:%s"
,
err
,
toStringStart
,
toStringEnd
);
qWarn
(
"failed to delete range cf(state) start: %s, end:%s, reason:%s"
,
toStringStart
,
toStringEnd
,
err
);
taosMemoryFree
(
err
);
}
...
...
@@ -1588,20 +1572,17 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
)
||
iterValueIsStale
(
pCur
->
iter
))
{
return
-
1
;
}
size_t
t
len
;
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
t
len
);
size_t
klen
,
v
len
;
char
*
keyStr
=
(
char
*
)
rocksdb_iter_key
(
pCur
->
iter
,
&
k
len
);
winKeyDecode
(
&
winKey
,
keyStr
);
size_t
vlen
=
0
;
const
char
*
valStr
=
rocksdb_iter_value
(
pCur
->
iter
,
&
vlen
);
char
*
dst
=
NULL
;
int32_t
len
=
decodeValueFunc
((
void
*
)
valStr
,
vlen
,
NULL
,
&
dst
);
//
char* dst = NULL;
int32_t
len
=
decodeValueFunc
((
void
*
)
valStr
,
vlen
,
NULL
,
(
char
**
)
pVal
);
if
(
len
<
0
)
{
return
-
1
;
}
if
(
pVal
!=
NULL
)
*
pVal
=
(
char
*
)
dst
;
if
(
pVLen
!=
NULL
)
*
pVLen
=
vlen
;
if
(
pVLen
!=
NULL
)
*
pVLen
=
len
;
*
pKey
=
winKey
;
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录