Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
80f348de
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
80f348de
编写于
4月 14, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix recover
上级
efa7b48c
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
19 addition
and
39 deletion
+19
-39
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+15
-28
source/libs/stream/src/tstreamFileState.c
source/libs/stream/src/tstreamFileState.c
+4
-11
未找到文件。
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
80f348de
...
@@ -489,7 +489,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -489,7 +489,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int i = streamGetInit(funcname); \
int i = streamGetInit(funcname); \
if (i < 0) { \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
code = -1; \
break; \
} \
} \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
...
@@ -515,7 +516,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -515,7 +516,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int i = streamGetInit(funcname); \
int i = streamGetInit(funcname); \
if (i < 0) { \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
code = -1; \
break; \
} \
} \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
...
@@ -550,7 +552,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
...
@@ -550,7 +552,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int i = streamGetInit(funcname); \
int i = streamGetInit(funcname); \
if (i < 0) { \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
code = -1; \
break; \
} \
} \
char toString[128] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
...
@@ -970,36 +973,20 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
...
@@ -970,36 +973,20 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
}
}
SStreamStateCur
*
streamStateSeekToLast_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateSeekToLast_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateGetCur_rocksdb"
);
qDebug
(
"streamStateGetCur_rocksdb"
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
int32_t
code
=
0
;
const
SStateKey
maxStateKey
=
{.
key
=
{.
groupId
=
UINT64_MAX
,
.
ts
=
INT64_MAX
},
.
opNum
=
INT64_MAX
};
STREAM_STATE_PUT_ROCKSDB
(
pState
,
"state"
,
&
maxStateKey
,
""
,
0
);
char
buf
[
128
]
=
{
0
};
int32_t
klen
=
stateKeyEncode
((
void
*
)
&
maxStateKey
,
buf
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
(
size_t
)
klen
);
rocksdb_iter_prev
(
pCur
->
iter
);
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
STREAM_STATE_DEL_ROCKSDB
(
pState
,
"state"
,
&
maxStateKey
);
char
buf
[
128
]
=
{
0
};
int
len
=
stateKeyEncode
((
void
*
)
&
sKey
,
buf
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
len
);
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
))
{
rocksdb_iter_seek_to_last
(
pCur
->
iter
);
}
else
{
rocksdb_iter_seek_to_last
(
pCur
->
iter
);
}
return
pCur
;
// rocksdb_iter_seek(pCur->iter, buf, len);
// if (rocksdb_iter_valid(pCur->iter)) {
// SStateKey curKey;
// size_t kLen = 0;
// char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
// stateKeyDecode((void*)&curKey, keyStr);
// if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
// pCur->number = pState->number;
// return pCur;
// }
// }
// streamStateFreeCur(pCur);
return
pCur
;
return
pCur
;
}
}
SStreamStateCur
*
streamStateGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
SStreamStateCur
*
streamStateGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
...
...
source/libs/stream/src/tstreamFileState.c
浏览文件 @
80f348de
...
@@ -327,7 +327,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
...
@@ -327,7 +327,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
SListIter
iter
=
{
0
};
SListIter
iter
=
{
0
};
tdListInitIter
(
pSnapshot
,
&
iter
,
TD_LIST_FORWARD
);
tdListInitIter
(
pSnapshot
,
&
iter
,
TD_LIST_FORWARD
);
const
int32_t
BATCH_LIMIT
=
128
;
const
int32_t
BATCH_LIMIT
=
256
;
SListNode
*
pNode
=
NULL
;
SListNode
*
pNode
=
NULL
;
void
*
batch
=
streamStateCreateBatch
();
void
*
batch
=
streamStateCreateBatch
();
...
@@ -414,6 +414,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
...
@@ -414,6 +414,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
sscanf
(
val
,
"%"
PRId64
""
,
&
ts
);
sscanf
(
val
,
"%"
PRId64
""
,
&
ts
);
taosMemoryFree
(
val
);
taosMemoryFree
(
val
);
if
(
ts
<
mark
)
{
if
(
ts
<
mark
)
{
// statekey winkey.ts < mark
forceRemoveCheckpoint
(
pFileState
,
i
);
forceRemoveCheckpoint
(
pFileState
,
i
);
break
;
break
;
}
else
{
}
else
{
...
@@ -428,16 +429,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
...
@@ -428,16 +429,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
void
*
pStVal
=
NULL
;
void
*
pStVal
=
NULL
;
int32_t
len
=
0
;
int32_t
len
=
0
;
SWinKey
key
=
{.
groupId
=
0
,
.
ts
=
0
};
SWinKey
key
=
{.
groupId
=
0
,
.
ts
=
0
};
// SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
SStreamStateCur
*
pCur
=
streamStateSeekToLast_rocksdb
(
pFileState
->
pFileStore
,
&
key
);
// if (!pCur) {
// return TSDB_CODE_FAILED;
// }
// code = streamStateSeekLast(pFileState->pFileStore, pCur);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
SStreamStateCur
*
pCur
=
streamStateSeekToLast_rocksdb
(
pState
,
&
key
);
if
(
pCur
==
NULL
)
{
if
(
pCur
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录