Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2d37aeab
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
2d37aeab
编写于
3月 20, 2023
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add backend
上级
05a205d3
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
373 addition
and
39 deletion
+373
-39
contrib/test/rocksdb/main.c
contrib/test/rocksdb/main.c
+200
-22
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+7
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+166
-16
未找到文件。
contrib/test/rocksdb/main.c
浏览文件 @
2d37aeab
#include <assert.h>
#include <bits/stdint-uintn.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
...
...
@@ -9,38 +10,215 @@
const
char
DBPath
[]
=
"rocksdb_c_simple_example"
;
const
char
DBBackupPath
[]
=
"/tmp/rocksdb_c_simple_example_backup"
;
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
static
int32_t
taosEncodeFixedU64
(
void
**
buf
,
uint64_t
value
)
{
if
(
buf
!=
NULL
)
{
((
uint8_t
*
)(
*
buf
))[
0
]
=
value
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
1
]
=
(
value
>>
8
)
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
2
]
=
(
value
>>
16
)
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
3
]
=
(
value
>>
24
)
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
4
]
=
(
value
>>
32
)
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
5
]
=
(
value
>>
40
)
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
6
]
=
(
value
>>
48
)
&
0xff
;
((
uint8_t
*
)(
*
buf
))[
7
]
=
(
value
>>
56
)
&
0xff
;
*
buf
=
POINTER_SHIFT
(
*
buf
,
sizeof
(
value
));
}
return
(
int32_t
)
sizeof
(
value
);
}
static
void
*
taosDecodeFixedU64
(
const
void
*
buf
,
uint64_t
*
value
)
{
((
uint8_t
*
)
value
)[
7
]
=
((
uint8_t
*
)
buf
)[
0
];
((
uint8_t
*
)
value
)[
6
]
=
((
uint8_t
*
)
buf
)[
1
];
((
uint8_t
*
)
value
)[
5
]
=
((
uint8_t
*
)
buf
)[
2
];
((
uint8_t
*
)
value
)[
4
]
=
((
uint8_t
*
)
buf
)[
3
];
((
uint8_t
*
)
value
)[
3
]
=
((
uint8_t
*
)
buf
)[
4
];
((
uint8_t
*
)
value
)[
2
]
=
((
uint8_t
*
)
buf
)[
5
];
((
uint8_t
*
)
value
)[
1
]
=
((
uint8_t
*
)
buf
)[
6
];
((
uint8_t
*
)
value
)[
0
]
=
((
uint8_t
*
)
buf
)[
7
];
return
POINTER_SHIFT
(
buf
,
sizeof
(
*
value
));
}
typedef
struct
KV
{
uint64_t
k1
;
uint64_t
k2
;
}
KV
;
int
kvSerial
(
KV
*
kv
,
char
*
buf
)
{
int
len
=
0
;
buf
[
0
]
=
'a'
;
buf
+=
1
;
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
kv
->
k1
);
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
kv
->
k2
);
return
len
;
}
const
char
*
kvDBName
(
void
*
name
)
{
return
"kvDBname"
;
}
int
kvDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
KV
w1
,
w2
;
memset
(
&
w1
,
0
,
sizeof
(
w1
));
memset
(
&
w2
,
0
,
sizeof
(
w2
));
char
*
p1
=
(
char
*
)
aBuf
;
char
*
p2
=
(
char
*
)
bBuf
;
p1
+=
1
;
p2
+=
1
;
p1
=
taosDecodeFixedU64
(
p1
,
&
w1
.
k1
);
p2
=
taosDecodeFixedU64
(
p2
,
&
w2
.
k1
);
p1
=
taosDecodeFixedU64
(
p1
,
&
w1
.
k2
);
p2
=
taosDecodeFixedU64
(
p2
,
&
w2
.
k2
);
if
(
w1
.
k1
<
w2
.
k1
)
{
return
-
1
;
}
else
if
(
w1
.
k1
>
w2
.
k1
)
{
return
1
;
}
if
(
w1
.
k2
<
w2
.
k2
)
{
return
-
1
;
}
else
if
(
w1
.
k2
>
w2
.
k2
)
{
return
1
;
}
return
0
;
}
int
kvDeserial
(
KV
*
kv
,
char
*
buf
)
{
char
*
p1
=
(
char
*
)
buf
;
p1
+=
1
;
p1
=
taosDecodeFixedU64
(
p1
,
&
kv
->
k1
);
p1
=
taosDecodeFixedU64
(
p1
,
&
kv
->
k2
);
return
0
;
}
int
main
(
int
argc
,
char
const
*
argv
[])
{
rocksdb_t
*
db
;
rocksdb_t
*
db
;
rocksdb_backup_engine_t
*
be
;
rocksdb_options_t
*
options
=
rocksdb_options_create
();
rocksdb_options_set_create_if_missing
(
options
,
1
);
// open DB
char
*
err
=
NULL
;
db
=
rocksdb_open
(
options
,
DBPath
,
&
err
);
char
*
err
=
NULL
;
const
char
*
path
=
"/tmp/db"
;
// Write
rocksdb_writeoptions_t
*
writeoptions
=
rocksdb_writeoptions_create
();
rocksdb_put
(
db
,
writeoptions
,
"key"
,
3
,
"value"
,
5
,
&
err
);
rocksdb_options_t
*
opt
=
rocksdb_options_create
();
rocksdb_options_set_create_if_missing
(
opt
,
1
);
rocksdb_options_set_create_missing_column_families
(
opt
,
1
);
const
char
*
cfName
[]
=
{
"default"
,
"cf1"
};
int
len
=
sizeof
(
cfName
)
/
sizeof
(
cfName
[
0
]);
// Read
rocksdb_readoptions_t
*
readoptions
=
rocksdb_readoptions_create
();
rocksdb_readoptions_set_snapshot
(
readoptions
,
rocksdb_create_snapshot
(
db
));
size_t
vallen
=
0
;
char
*
val
=
rocksdb_get
(
db
,
readoptions
,
"key"
,
3
,
&
vallen
,
&
err
);
printf
(
"val:%s
\n
"
,
val
);
const
rocksdb_options_t
**
cfOpt
=
malloc
(
len
*
sizeof
(
rocksdb_options_t
*
));
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
cfOpt
[
i
]
=
opt
;
}
// Update
// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5
, &err);
rocksdb_column_family_handle_t
**
cfHandle
=
malloc
(
len
*
sizeof
(
rocksdb_column_family_handle_t
*
));
db
=
rocksdb_open_column_families
(
opt
,
"test"
,
len
,
cfName
,
cfOpt
,
cfHandle
,
&
err
);
// Delete
rocksdb_delete
(
db
,
writeoptions
,
"key"
,
3
,
&
err
);
{
rocksdb_readoptions_t
*
rOpt
=
rocksdb_readoptions_create
();
size_t
vlen
=
0
;
// Read again
val
=
rocksdb_get
(
db
,
readoptions
,
"key"
,
3
,
&
vallen
,
&
err
);
printf
(
"val:%s
\n
"
,
val
);
char
*
v
=
rocksdb_get_cf
(
db
,
rOpt
,
cfHandle
[
0
],
"key"
,
strlen
(
"key"
),
&
vlen
,
&
err
);
printf
(
"Get value %s, and len = %d
\n
"
,
v
,
(
int
)
vlen
);
char
*
v1
=
rocksdb_get_cf
(
db
,
rOpt
,
cfHandle
[
1
],
"key"
,
strlen
(
"key"
),
&
vlen
,
&
err
);
printf
(
"Get value %s, and len = %d
\n
"
,
v1
,
(
int
)
vlen
);
rocksdb_readoptions_destroy
(
rOpt
);
}
rocksdb_writeoptions_t
*
wOpt
=
rocksdb_writeoptions_create
();
rocksdb_writebatch_t
*
wBatch
=
rocksdb_writebatch_create
();
rocksdb_writebatch_put_cf
(
wBatch
,
cfHandle
[
0
],
"key"
,
strlen
(
"key"
),
"value"
,
strlen
(
"value"
));
rocksdb_write
(
db
,
wOpt
,
wBatch
,
&
err
);
rocksdb_readoptions_t
*
rOpt
=
rocksdb_readoptions_create
();
size_t
vlen
=
0
;
char
*
v
=
rocksdb_get_cf
(
db
,
rOpt
,
cfHandle
[
0
],
"key"
,
strlen
(
"key"
),
&
vlen
,
&
err
);
printf
(
"Get value %s, and len = %d
\n
"
,
v
,
(
int
)
vlen
);
rocksdb_column_family_handle_destroy
(
cfHandle
[
0
]);
rocksdb_column_family_handle_destroy
(
cfHandle
[
1
]);
rocksdb_close
(
db
);
// {
// // rocksdb_options_t *Options = rocksdb_options_create();
// db = rocksdb_open(comm, path, &err);
// if (db != NULL) {
// rocksdb_options_t *cfo = rocksdb_options_create_copy(comm);
// rocksdb_comparator_t *cmp1 = rocksdb_comparator_create(NULL, NULL, kvDBComp, kvDBName);
// rocksdb_options_set_comparator(cfo, cmp1);
// rocksdb_column_family_handle_t *handle = rocksdb_create_column_family(db, cfo, "cf1", &err);
// rocksdb_column_family_handle_destroy(handle);
// rocksdb_close(db);
// db = NULL;
// }
// }
// int ncf = 2;
// rocksdb_column_family_handle_t **pHandle = malloc(ncf * sizeof(rocksdb_column_family_handle_t *));
// {
// rocksdb_options_t *options = rocksdb_options_create_copy(comm);
// rocksdb_comparator_t *cmp1 = rocksdb_comparator_create(NULL, NULL, kvDBComp, kvDBName);
// rocksdb_options_t *dbOpts1 = rocksdb_options_create_copy(comm);
// rocksdb_options_t *dbOpts2 = rocksdb_options_create_copy(comm);
// rocksdb_options_set_comparator(dbOpts2, cmp1);
// // rocksdb_column_family_handle_t *cf = rocksdb_create_column_family(db, dbOpts1, "cmp1", &err);
// const char *pName[] = {"default", "cf1"};
// const rocksdb_options_t **pOpts = malloc(ncf * sizeof(rocksdb_options_t *));
// pOpts[0] = dbOpts1;
// pOpts[1] = dbOpts2;
// rocksdb_options_t *allOptions = rocksdb_options_create_copy(comm);
// db = rocksdb_open_column_families(allOptions, "test", ncf, pName, pOpts, pHandle, &err);
// }
// // rocksdb_options_t *options = rocksdb_options_create();
// // rocksdb_options_set_create_if_missing(options, 1);
// // //rocksdb_open_column_families(const rocksdb_options_t *options, const char *name, int num_column_families,
// // const char *const *column_family_names,
// // const rocksdb_options_t *const *column_family_options,
// // rocksdb_column_family_handle_t **column_family_handles, char **errptr);
// for (int i = 0; i < 100; i++) {
// char buf[128] = {0};
// rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create();
// KV kv = {.k1 = i, .k2 = i};
// kvSerial(&kv, buf);
// rocksdb_put_cf(db, wopt, pHandle[0], buf, strlen(buf), (const char *)&i, sizeof(i), &err);
// }
// rocksdb_close(db);
// Write
// rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create();
// rocksdb_put(db, writeoptions, "key", 3, "value", 5, &err);
//// Read
// rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
// rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db));
// size_t vallen = 0;
// char *val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
// printf("val:%s\n", val);
//// Update
//// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err);
//// Delete
// rocksdb_delete(db, writeoptions, "key", 3, &err);
//// Read again
// val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
// printf("val:%s\n", val);
// rocksdb_close(db);
return
0
;
}
\ No newline at end of file
include/libs/stream/streamState.h
浏览文件 @
2d37aeab
...
...
@@ -32,7 +32,13 @@ typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
typedef
struct
STdbState
{
SStreamTask
*
pOwner
;
rocksdb_t
*
rocksdb
;
rocksdb_t
*
rocksdb
;
rocksdb_column_family_handle_t
**
pHandle
;
// rocksdb_column_family_handle_t* fillStateDB;
// rocksdb_column_family_handle_t* sessStateDB;
// rocksdb_column_family_handle_t* funcStateDB;
// rocksdb_column_family_handle_t* parnameStateDB;
// rocksdb_column_family_handle_t* partagStateDB;
TDB
*
db
;
TTB
*
pStateDb
;
...
...
source/libs/stream/src/streamState.c
浏览文件 @
2d37aeab
...
...
@@ -14,9 +14,13 @@
*/
#include "streamState.h"
#include <bits/stdint-uintn.h>
#include <string.h>
#include "executor.h"
#include "osMemory.h"
#include "rocksdb/c.h"
#include "streamInc.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
#include "ttimer.h"
...
...
@@ -109,13 +113,143 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
}
//
//
//
SStateKey
// |--groupid--|---ts------|--opNum----|
// |--uint64_t-|-uint64_t--|--int64_t--|
//
//
//
int
compareState
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
return
-
1
;
}
int
stateKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
SStateKey
key1
,
key2
;
memset
(
&
key1
,
0
,
sizeof
(
key1
));
memset
(
&
key2
,
0
,
sizeof
(
key2
));
char
*
p1
=
(
char
*
)
aBuf
;
char
*
p2
=
(
char
*
)
bBuf
;
p1
=
taosDecodeFixedU64
(
p1
,
&
key1
.
key
.
groupId
);
p2
=
taosDecodeFixedU64
(
p2
,
&
key2
.
key
.
groupId
);
p1
=
taosDecodeFixedI64
(
p1
,
&
key1
.
key
.
ts
);
p2
=
taosDecodeFixedI64
(
p2
,
&
key2
.
key
.
ts
);
taosDecodeFixedI64
(
p1
,
&
key1
.
opNum
);
taosDecodeFixedI64
(
p2
,
&
key2
.
opNum
);
return
stateKeyCmpr
(
&
key1
,
sizeof
(
key1
),
&
key2
,
sizeof
(
key2
));
}
int
stateKeySerial
(
SStateKey
*
key
,
char
*
buf
)
{
int
len
=
0
;
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
key
->
key
.
groupId
);
len
+=
taosEncodeFixedI64
((
void
**
)
&
buf
,
key
->
key
.
ts
);
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
key
->
opNum
);
return
len
;
}
//
// SStateSessionKey
// |-----------SSessionKey----------|
// |-----STimeWindow-----|
// |---skey--|---ekey----|--groupId-|--opNum--|
// |---int64-|--int64_t--|--uint64--|--int64_t|
// |
//
int
stateSessionKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
SStateSessionKey
w1
,
w2
;
memset
(
&
w1
,
0
,
sizeof
(
w1
));
memset
(
&
w2
,
0
,
sizeof
(
w2
));
char
*
p1
=
(
char
*
)
aBuf
;
char
*
p2
=
(
char
*
)
bBuf
;
p1
=
taosDecodeFixedI64
(
p1
,
&
w1
.
key
.
win
.
skey
);
p2
=
taosDecodeFixedI64
(
p2
,
&
w2
.
key
.
win
.
skey
);
p1
=
taosDecodeFixedI64
(
p1
,
&
w1
.
key
.
win
.
ekey
);
p2
=
taosDecodeFixedI64
(
p2
,
&
w2
.
key
.
win
.
ekey
);
p1
=
taosDecodeFixedU64
(
p1
,
&
w1
.
key
.
groupId
);
p2
=
taosDecodeFixedU64
(
p2
,
&
w2
.
key
.
groupId
);
p1
=
taosDecodeFixedI64
(
p1
,
&
w1
.
opNum
);
p2
=
taosDecodeFixedI64
(
p2
,
&
w2
.
opNum
);
return
stateSessionKeyCmpr
(
&
w1
,
sizeof
(
w1
),
&
w2
,
sizeof
(
w2
));
}
int
stateSessionKeySerial
(
SStateSessionKey
*
sess
,
char
*
buf
)
{
int
len
=
0
;
len
+=
taosEncodeFixedI64
((
void
**
)
&
buf
,
sess
->
key
.
win
.
skey
);
len
+=
taosEncodeFixedI64
((
void
**
)
&
buf
,
sess
->
key
.
win
.
ekey
);
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
sess
->
key
.
groupId
);
len
+=
taosEncodeFixedI64
((
void
**
)
&
buf
,
sess
->
opNum
);
return
len
;
}
/**
* SWinKey
* |------groupId------|-----ts------|
* |------uint64-------|----int64----|
*/
int
winKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
SWinKey
w1
,
w2
;
memset
(
&
w1
,
0
,
sizeof
(
w1
));
memset
(
&
w2
,
0
,
sizeof
(
w2
));
char
*
p1
=
(
char
*
)
aBuf
;
char
*
p2
=
(
char
*
)
bBuf
;
p1
=
taosDecodeFixedU64
(
p1
,
&
w1
.
groupId
);
p2
=
taosDecodeFixedU64
(
p2
,
&
w2
.
groupId
);
p1
=
taosDecodeFixedI64
(
p1
,
&
w1
.
ts
);
p2
=
taosDecodeFixedI64
(
p2
,
&
w2
.
ts
);
return
winKeyCmpr
(
&
w1
,
sizeof
(
w1
),
&
w2
,
sizeof
(
w2
));
}
int
winKeySerial
(
SWinKey
*
key
,
char
*
buf
)
{
int
len
=
0
;
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
key
->
groupId
);
len
+=
taosEncodeFixedI64
((
void
**
)
&
buf
,
key
->
ts
);
return
len
;
}
/*
* STupleKey
* |---groupId---|---ts---|---exprIdx---|
* |---uint64--|---int64--|---int32-----|
*/
int
tupleKeyDBComp
(
void
*
state
,
const
char
*
aBuf
,
size_t
aLen
,
const
char
*
bBuf
,
size_t
bLen
)
{
STupleKey
w1
,
w2
;
memset
(
&
w1
,
0
,
sizeof
(
w1
));
memset
(
&
w2
,
0
,
sizeof
(
w2
));
char
*
p1
=
(
char
*
)
aBuf
;
char
*
p2
=
(
char
*
)
bBuf
;
p1
=
taosDecodeFixedU64
(
p1
,
&
w1
.
groupId
);
p2
=
taosDecodeFixedU64
(
p2
,
&
w2
.
groupId
);
p1
=
taosDecodeFixedI64
(
p1
,
&
w1
.
ts
);
p2
=
taosDecodeFixedI64
(
p2
,
&
w2
.
ts
);
p1
=
taosDecodeFixedI32
(
p1
,
&
w1
.
exprIdx
);
p2
=
taosDecodeFixedI32
(
p2
,
&
w2
.
exprIdx
);
return
STupleKeyCmpr
(
&
w1
,
sizeof
(
w1
),
&
w2
,
sizeof
(
w2
));
}
int
tupleKeySerial
(
STupleKey
*
key
,
char
*
buf
)
{
int
len
=
0
;
len
+=
taosEncodeFixedU64
((
void
**
)
&
buf
,
key
->
groupId
);
len
+=
taosEncodeFixedI64
((
void
**
)
&
buf
,
key
->
ts
);
len
+=
taosEncodeFixedI32
((
void
**
)
&
buf
,
key
->
exprIdx
);
return
len
;
}
const
char
*
cfName
[]
=
{
"default"
,
"fill"
,
"sess"
,
"func"
,
"parname"
,
"partag"
};
const
char
*
compareStateName
(
void
*
name
)
{
return
NULL
;
}
int
streamInitBackend
(
SStreamState
*
pState
,
char
*
path
)
{
rocksdb_options_t
*
opts
=
rocksdb_options_create
();
...
...
@@ -123,30 +257,46 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_optimize_level_style_compaction
(
opts
,
0
);
// create the DB if it's not already present
rocksdb_options_set_create_if_missing
(
opts
,
1
);
rocksdb_options_set_create_missing_column_families
(
opts
,
1
);
rocksdb_comparator_t
*
cmp1
=
rocksdb_comparator_create
(
NULL
,
NULL
,
compareState
,
compareStateName
)
;
rocksdb_comparator_t
*
cmp2
=
rocksdb_comparator_create
(
NULL
,
NULL
,
compareState
,
compareStateName
);
char
*
err
=
NULL
;
int
cfLen
=
sizeof
(
cfName
)
/
sizeof
(
cfName
[
0
]
);
char
*
err
=
NULL
;
rocksdb_t
*
db
=
rocksdb_open
(
opts
,
path
,
&
err
);
if
(
err
==
NULL
)
{
pState
->
pTdbState
->
rocksdb
=
db
;
const
rocksdb_options_t
**
cfOpt
=
taosMemoryCalloc
(
cfLen
,
sizeof
(
rocksdb_options_t
*
));
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
cfOpt
[
i
]
=
rocksdb_options_create_copy
(
opts
);
}
rocksdb_
options_t
*
dbOpts1
=
rocksdb_options_create_copy
(
opts
);
rocksdb_options_
t
*
dbOpts2
=
rocksdb_options_create_copy
(
opts
);
rocksdb_
comparator_t
*
fillCompare
=
rocksdb_comparator_create
(
NULL
,
NULL
,
stateKeyDBComp
,
compareStateName
);
rocksdb_options_
set_comparator
((
rocksdb_options_t
*
)
cfOpt
[
1
],
fillCompare
);
rocksdb_
options_set_comparator
(
dbOpts1
,
cmp1
);
rocksdb_options_set_comparator
(
dbOpts2
,
cmp2
);
rocksdb_
comparator_t
*
sessCompare
=
rocksdb_comparator_create
(
NULL
,
NULL
,
stateKeyDBComp
,
compareStateName
);
rocksdb_options_set_comparator
(
(
rocksdb_options_t
*
)
cfOpt
[
2
],
sessCompare
);
rocksdb_co
lumn_family_handle_t
*
cf1
=
rocksdb_create_column_family
(
db
,
dbOpts1
,
"cmp1"
,
&
err
);
rocksdb_
column_family_handle_t
*
cf2
=
rocksdb_create_column_family
(
db
,
dbOpts2
,
"cmp2"
,
&
err
);
rocksdb_co
mparator_t
*
funcCompare
=
rocksdb_comparator_create
(
NULL
,
NULL
,
stateKeyDBComp
,
compareStateName
);
rocksdb_
options_set_comparator
((
rocksdb_options_t
*
)
cfOpt
[
3
],
funcCompare
);
rocksdb_
writebatch_t
*
wp
=
rocksdb_writeoptions_create
(
);
rocksdb_
put_cf
(
db
,
wp
,
cf1
,
NULL
,
0
,
NULL
,
0
,
&
err
);
rocksdb_
comparator_t
*
parnameCompare
=
rocksdb_comparator_create
(
NULL
,
NULL
,
stateKeyDBComp
,
compareStateName
);
rocksdb_
options_set_comparator
((
rocksdb_options_t
*
)
cfOpt
[
4
],
parnameCompare
);
rocksdb_comparator_t
*
partagCompare
=
rocksdb_comparator_create
(
NULL
,
NULL
,
stateKeyDBComp
,
compareStateName
);
rocksdb_options_set_comparator
((
rocksdb_options_t
*
)
cfOpt
[
5
],
partagCompare
);
rocksdb_column_family_handle_t
**
cfHandle
=
taosMemoryMalloc
(
cfLen
*
sizeof
(
rocksdb_column_family_handle_t
*
));
rocksdb_t
*
db
=
rocksdb_open_column_families
(
opts
,
"rocksdb"
,
cfLen
,
cfName
,
cfOpt
,
cfHandle
,
&
err
);
pState
->
pTdbState
->
rocksdb
=
db
;
pState
->
pTdbState
->
pHandle
=
cfHandle
;
return
0
;
}
void
streamCleanBackend
(
SStreamState
*
pState
)
{
int
cfLen
=
sizeof
(
cfName
)
/
sizeof
(
cfName
[
0
]);
for
(
int
i
=
0
;
i
<
cfLen
;
i
++
)
{
rocksdb_column_family_handle_destroy
(
pState
->
pTdbState
->
pHandle
[
i
]);
}
rocksdb_close
(
pState
->
pTdbState
->
rocksdb
);
}
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
,
bool
specPath
,
int32_t
szPage
,
int32_t
pages
)
{
SStreamState
*
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
if
(
pState
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录