Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c70cfdf
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4c70cfdf
编写于
4月 14, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
startGroupTableMe
上级
1b255231
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
82 addition
and
41 deletion
+82
-41
source/libs/stream/inc/streamBackendRocksdb.h
source/libs/stream/inc/streamBackendRocksdb.h
+5
-3
source/libs/stream/src/streamStateRocksdb.c
source/libs/stream/src/streamStateRocksdb.c
+63
-29
source/libs/stream/src/tstreamFileState.c
source/libs/stream/src/tstreamFileState.c
+14
-9
未找到文件。
source/libs/stream/inc/streamBackendRocksdb.h
浏览文件 @
4c70cfdf
...
...
@@ -57,9 +57,10 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
int32_t
streamStateStateAddIfNotExist_rocksdb
(
SStreamState
*
pState
,
SSessionKey
*
key
,
char
*
pKeyData
,
int32_t
keyDataLen
,
state_key_cmpr_fn
fn
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamStateGetFirst_rocksdb
(
SStreamState
*
pState
,
SWinKey
*
key
);
int32_t
streamStateSessionClear_rocksdb
(
SStreamState
*
pState
);
int32_t
streamStateCurPrev_rocksdb
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
int32_t
streamStateGetFirst_rocksdb
(
SStreamState
*
pState
,
SWinKey
*
key
);
int32_t
streamStateSessionClear_rocksdb
(
SStreamState
*
pState
);
int32_t
streamStateCurPrev_rocksdb
(
SStreamState
*
pState
,
SStreamStateCur
*
pCur
);
SStreamStateCur
*
streamStateSeekToLast_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
);
int32_t
streamStateGetGroupKVByCur_rocksdb
(
SStreamStateCur
*
pCur
,
SWinKey
*
pKey
,
const
void
**
pVal
,
int32_t
*
pVLen
);
...
...
@@ -90,6 +91,7 @@ int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pV
int32_t
streamDefaultGet_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
,
void
**
pVal
,
int32_t
*
pVLen
);
int32_t
streamDefaultDel_rocksdb
(
SStreamState
*
pState
,
const
void
*
key
);
int32_t
streamDefaultIterGet_rocksdb
(
SStreamState
*
pState
,
const
void
*
start
,
const
void
*
end
,
SArray
*
result
);
void
*
streamDefaultIterCreate_rocksdb
(
SStreamState
*
pState
);
int32_t
streamDefaultIterValid_rocksdb
(
void
*
iter
);
void
streamDefaultIterSeek_rocksdb
(
void
*
iter
,
const
char
*
key
);
...
...
source/libs/stream/src/streamStateRocksdb.c
浏览文件 @
4c70cfdf
...
...
@@ -718,37 +718,37 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
// rocksdb_iter_destroy(pIter);
// return code;
// }
int32_t
streamDefaultIter_rocksdb
(
SStreamState
*
pState
,
const
void
*
start
,
const
void
*
end
,
SArray
*
result
)
{
//
int code = 0;
//
char* err = NULL;
int32_t
streamDefaultIter
Get
_rocksdb
(
SStreamState
*
pState
,
const
void
*
start
,
const
void
*
end
,
SArray
*
result
)
{
int
code
=
0
;
char
*
err
=
NULL
;
//
rocksdb_snapshot_t* snapshot = NULL;
//
rocksdb_readoptions_t* readopts = NULL;
//
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
//
if (pIter == NULL) {
//
return -1;
//
}
rocksdb_snapshot_t
*
snapshot
=
NULL
;
rocksdb_readoptions_t
*
readopts
=
NULL
;
rocksdb_iterator_t
*
pIter
=
streamStateIterCreate
(
pState
,
"default"
,
&
snapshot
,
&
readopts
);
if
(
pIter
==
NULL
)
{
return
-
1
;
}
//
rocksdb_iter_seek(pIter, start, strlen(start));
//
while (rocksdb_iter_valid(pIter)) {
//
const char* key = rocksdb_iter_key(pIter, NULL);
//
if (end != NULL && strcmp(key, end) > 0) {
//
break;
//
}
//
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
//
int64_t checkPoint = 0;
//
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
//
taosArrayPush(result, &checkPoint);
//
}
//
} else {
//
break;
//
}
//
rocksdb_iter_next(pIter);
//
}
//
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
//
rocksdb_readoptions_destroy(readopts);
//
rocksdb_iter_destroy(pIter);
//
return code;
rocksdb_iter_seek
(
pIter
,
start
,
strlen
(
start
));
while
(
rocksdb_iter_valid
(
pIter
))
{
const
char
*
key
=
rocksdb_iter_key
(
pIter
,
NULL
);
if
(
end
!=
NULL
&&
strcmp
(
key
,
end
)
>
0
)
{
break
;
}
if
(
strncmp
(
key
,
start
,
strlen
(
start
))
==
0
&&
strlen
(
key
)
>=
strlen
(
start
)
+
1
)
{
int64_t
checkPoint
=
0
;
if
(
sscanf
(
key
+
strlen
(
key
),
":%"
PRId64
""
,
&
checkPoint
)
==
1
)
{
taosArrayPush
(
result
,
&
checkPoint
);
}
}
else
{
break
;
}
rocksdb_iter_next
(
pIter
);
}
rocksdb_release_snapshot
(
pState
->
pTdbState
->
rocksdb
,
snapshot
);
rocksdb_readoptions_destroy
(
readopts
);
rocksdb_iter_destroy
(
pIter
);
return
code
;
}
int32_t
streamStateGet_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
,
void
**
pVal
,
int32_t
*
pVLen
)
{
...
...
@@ -968,6 +968,40 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
memset
(
*
pVal
,
0
,
size
);
return
0
;
}
SStreamStateCur
*
streamStateSeekToLast_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateGetCur_rocksdb"
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
if
(
pCur
==
NULL
)
return
NULL
;
pCur
->
db
=
pState
->
pTdbState
->
rocksdb
;
pCur
->
iter
=
streamStateIterCreate
(
pState
,
"state"
,
&
pCur
->
snapshot
,
&
pCur
->
readOpt
);
SStateKey
sKey
=
{.
key
=
*
key
,
.
opNum
=
pState
->
number
};
char
buf
[
128
]
=
{
0
};
int
len
=
stateKeyEncode
((
void
*
)
&
sKey
,
buf
);
rocksdb_iter_seek
(
pCur
->
iter
,
buf
,
len
);
if
(
!
rocksdb_iter_valid
(
pCur
->
iter
))
{
rocksdb_iter_seek_to_last
(
pCur
->
iter
);
}
else
{
rocksdb_iter_seek_to_last
(
pCur
->
iter
);
}
return
pCur
;
// rocksdb_iter_seek(pCur->iter, buf, len);
// if (rocksdb_iter_valid(pCur->iter)) {
// SStateKey curKey;
// size_t kLen = 0;
// char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
// stateKeyDecode((void*)&curKey, keyStr);
// if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
// pCur->number = pState->number;
// return pCur;
// }
// }
// streamStateFreeCur(pCur);
return
pCur
;
}
SStreamStateCur
*
streamStateGetCur_rocksdb
(
SStreamState
*
pState
,
const
SWinKey
*
key
)
{
qDebug
(
"streamStateGetCur_rocksdb"
);
SStreamStateCur
*
pCur
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamStateCur
));
...
...
source/libs/stream/src/tstreamFileState.c
浏览文件 @
4c70cfdf
...
...
@@ -382,7 +382,7 @@ int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId
int32_t
getSnapshotIdList
(
SStreamFileState
*
pFileState
,
SArray
*
list
)
{
const
char
*
taskKey
=
"streamFileState"
;
return
streamDefaultIter_rocksdb
(
pFileState
->
pFileStore
,
taskKey
,
NULL
,
list
);
return
streamDefaultIter
Get
_rocksdb
(
pFileState
->
pFileStore
,
taskKey
,
NULL
,
list
);
}
int32_t
deleteExpiredCheckPoint
(
SStreamFileState
*
pFileState
,
TSKEY
mark
)
{
...
...
@@ -428,15 +428,20 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
void
*
pStVal
=
NULL
;
int32_t
len
=
0
;
SWinKey
key
=
{.
groupId
=
0
,
.
ts
=
0
};
SStreamStateCur
*
pCur
=
streamStateGetCur_rocksdb
(
pFileState
->
pFileStore
,
&
key
);
if
(
!
pCur
)
{
return
TSDB_CODE_FAILED
;
}
code
=
streamStateSeekLast
(
pFileState
->
pFileStore
,
pCur
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
SWinKey
key
=
{.
groupId
=
0
,
.
ts
=
0
};
// SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
// if (!pCur) {
// return TSDB_CODE_FAILED;
// }
// code = streamStateSeekLast(pFileState->pFileStore, pCur);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
SStreamStateCur
*
pCur
=
streamStateSeekToLast_rocksdb
(
pState
,
&
key
);
if
(
pCur
==
NULL
)
{
return
-
1
;
}
while
(
code
==
TSDB_CODE_SUCCESS
)
{
if
(
pFileState
->
curRowCount
==
pFileState
->
maxRowCount
)
{
break
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录