Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
d93d1093
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d93d1093
编写于
3月 22, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-15] refact the interface of sdb
上级
47029bc3
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
435 addition
and
402 deletion
+435
-402
src/mnode/inc/mgmtSdb.h
src/mnode/inc/mgmtSdb.h
+22
-21
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+413
-381
未找到文件。
src/mnode/inc/mgmtSdb.h
浏览文件 @
d93d1093
...
@@ -21,46 +21,47 @@ extern "C" {
...
@@ -21,46 +21,47 @@ extern "C" {
#endif
#endif
typedef
enum
{
typedef
enum
{
SDB_KEYTYPE_STRING
,
SDB_KEY_TYPE_STRING
,
SDB_KEYTYPE_AUTO
,
SDB_KEY_TYPE_AUTO
SDB_KEYTYPE_MAX
}
ESdbKeyType
;
}
ESdbKeyType
;
typedef
enum
{
typedef
enum
{
SDB_OPER_GLOBAL
,
SDB_OPER_TYPE_GLOBAL
,
SDB_OPER_LOCAL
,
SDB_OPER_TYPE_LOCAL
SDB_OPER_DISK
}
ESdbOperType
;
}
ESdbOperType
;
enum
_sdbaction
{
typedef
struct
{
SDB_TYPE_INSERT
,
ESdbOperType
type
;
SDB_TYPE_DELETE
,
int32_t
maxRowSize
;
SDB_TYPE_UPDATE
,
int32_t
rowSize
;
}
ESdbForwardType
;
void
*
rowData
;
void
*
pObj
;
void
*
table
;
int64_t
version
;
}
SSdbOperDesc
;
typedef
struct
{
typedef
struct
{
char
*
tableName
;
char
*
tableName
;
int32_t
hashSessions
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
maxRowSize
;
ESdbKeyType
keyType
;
ESdbKeyType
keyType
;
int32_t
(
*
insertFp
)(
void
*
pObj
);
int32_t
(
*
insertFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
deleteFp
)(
void
*
pObj
);
int32_t
(
*
deleteFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
updateFp
)(
void
*
pObj
);
int32_t
(
*
updateFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
encodeFp
)(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
);
int32_t
(
*
encodeFp
)(
SSdbOperDesc
*
pOper
);
void
*
(
*
decodeFp
)(
void
*
pData
);
int32_t
(
*
decodeFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
destroyFp
)(
void
*
pObj
);
int32_t
(
*
destroyFp
)(
SSdbOperDesc
*
pDesc
);
}
SSdbTableDesc
;
}
SSdbTableDesc
;
void
*
sdbOpenTable
(
SSdbTableDesc
*
desc
);
void
*
sdbOpenTable
(
SSdbTableDesc
*
desc
);
void
sdbCloseTable
(
void
*
handle
);
void
sdbCloseTable
(
void
*
handle
);
int32_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
ESdbOperType
o
per
);
int32_t
sdbInsertRow
(
SSdbOperDesc
*
pO
per
);
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
key
,
ESdbOperType
o
per
);
int32_t
sdbDeleteRow
(
SSdbOperDesc
*
pO
per
);
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
rowSize
,
ESdbOperType
o
per
);
int32_t
sdbUpdateRow
(
SSdbOperDesc
*
pO
per
);
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
);
void
*
sdbGetRow
(
void
*
handle
,
void
*
key
);
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
);
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
);
int64_t
sdbGetId
(
void
*
handle
);
int64_t
sdbGetNumOfRows
(
void
*
handle
);
int64_t
sdbGetNumOfRows
(
void
*
handle
);
uint64_t
sdbGetVersion
();
uint64_t
sdbGetVersion
();
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
d93d1093
...
@@ -16,6 +16,7 @@
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#include "taosdef.h"
#include "taosdef.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tchecksum.h"
#include "tglobalcfg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tlog.h"
...
@@ -39,29 +40,29 @@ typedef struct {
...
@@ -39,29 +40,29 @@ typedef struct {
typedef
struct
_SSdbTable
{
typedef
struct
_SSdbTable
{
SSdbHeader
header
;
SSdbHeader
header
;
char
n
ame
[
TSDB_DB_NAME_LEN
];
char
tableN
ame
[
TSDB_DB_NAME_LEN
];
char
f
n
[
TSDB_FILENAME_LEN
];
char
f
ileName
[
TSDB_FILENAME_LEN
];
ESdbKeyType
keyType
;
ESdbKeyType
keyType
;
int32_t
db
Id
;
int32_t
table
Id
;
int32_t
hashSessions
;
int32_t
hashSessions
;
int32_t
maxRowSize
;
int32_t
maxRowSize
;
uint32_t
autoIndex
;
int32_t
autoIndex
;
int32_t
fd
;
int64_t
numOfRows
;
int64_t
numOfRows
;
int64_t
id
;
int64_t
version
;
int64_t
s
ize
;
int64_t
fileS
ize
;
void
*
iHandle
;
void
*
iHandle
;
int32_t
fd
;
int32_t
(
*
insertFp
)(
SSdbOperDesc
*
pDesc
);
int32_t
(
*
insertFp
)(
void
*
pObj
);
int32_t
(
*
deleteFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
deleteFp
)(
void
*
pObj
);
int32_t
(
*
updateFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
updateFp
)(
void
*
pObj
);
int32_t
(
*
decodeFp
)(
SSdbOperDesc
*
pOper
);
void
*
(
*
decodeFp
)(
void
*
pData
);
// return pObj
int32_t
(
*
encodeFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
encodeFp
)(
void
*
pObj
,
void
*
pData
,
int32_t
maxRowSize
);
// return size of pData
int32_t
(
*
destroyFp
)(
SSdbOperDesc
*
pOper
);
int32_t
(
*
destroyFp
)(
void
*
pObj
);
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
}
SSdbTable
;
}
SSdbTable
;
typedef
struct
{
typedef
struct
{
int64_t
id
;
int64_t
version
;
int64_t
offset
;
int64_t
offset
;
int32_t
rowSize
;
int32_t
rowSize
;
void
*
row
;
void
*
row
;
...
@@ -70,21 +71,27 @@ typedef struct {
...
@@ -70,21 +71,27 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int32_t
delimiter
;
int32_t
delimiter
;
int32_t
rowSize
;
int32_t
rowSize
;
int64_t
id
;
int64_t
version
;
char
data
[];
char
data
[];
}
SRowHead
;
}
SRowHead
;
typedef
enum
{
SDB_FORWARD_TYPE_INSERT
,
SDB_FORWARD_TYPE_DELETE
,
SDB_FORWARD_TYPE_UPDATE
}
ESdbForwardType
;
typedef
struct
{
typedef
struct
{
uint8_t
dbId
;
ESdbForwardType
type
;
int
8_t
type
;
int
32_t
tableId
;
int
16_t
dataLe
n
;
int
64_t
versio
n
;
uint64_t
version
;
int32_t
rowSize
;
char
data
[]
;
void
*
rowData
;
}
SForwardMsg
;
}
SForwardMsg
;
extern
char
version
[];
extern
char
version
[];
const
int16_t
sdbFileVersion
=
2
;
const
int16_t
sdbFileVersion
=
2
;
int32_t
(
*
mpeerForwardRequestFp
)(
S
SdbTable
*
pTable
,
char
type
,
void
*
cont
,
int32_t
contLen
)
=
NULL
;
int32_t
(
*
mpeerForwardRequestFp
)(
S
ForwardMsg
*
forwardMsg
)
=
NULL
;
static
SSdbTable
*
sdbTableList
[
10
]
=
{
0
};
static
SSdbTable
*
sdbTableList
[
10
]
=
{
0
};
static
int32_t
sdbNumOfTables
=
0
;
static
int32_t
sdbNumOfTables
=
0
;
...
@@ -101,25 +108,36 @@ void sdbResetTable(SSdbTable *pTable);
...
@@ -101,25 +108,36 @@ void sdbResetTable(SSdbTable *pTable);
void
sdbSaveSnapShot
(
void
*
handle
);
void
sdbSaveSnapShot
(
void
*
handle
);
uint64_t
sdbGetVersion
()
{
return
sdbVersion
;
}
uint64_t
sdbGetVersion
()
{
return
sdbVersion
;
}
int64_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
id
;
}
int64_t
sdbGetId
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
version
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
int64_t
sdbGetNumOfRows
(
void
*
handle
)
{
return
((
SSdbTable
*
)
handle
)
->
numOfRows
;
}
static
int32_t
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int32_t
dataLen
)
{
static
char
*
sdbGetkeyStr
(
SSdbTable
*
pTable
,
void
*
row
)
{
static
char
str
[
16
];
switch
(
pTable
->
keyType
)
{
case
SDB_KEY_TYPE_STRING
:
return
(
char
*
)
row
;
case
SDB_KEY_TYPE_AUTO
:
sprintf
(
str
,
"%d"
,
*
(
int32_t
*
)
row
);
return
str
;
default:
return
"unknown"
;
}
}
static
int32_t
sdbForwardDbReqToPeer
(
SForwardMsg
*
forwardMsg
)
{
if
(
mpeerForwardRequestFp
)
{
if
(
mpeerForwardRequestFp
)
{
return
mpeerForwardRequestFp
(
pTable
,
type
,
data
,
dataLen
);
return
mpeerForwardRequestFp
(
forwardMsg
);
}
else
{
}
else
{
return
0
;
return
0
;
}
}
}
}
static
void
sdbFinishCommit
(
void
*
handle
)
{
static
void
sdbFinishCommit
(
SSdbTable
*
pTable
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
uint32_t
sdbEcommit
=
SDB_ENDCOMMIT
;
uint32_t
sdbEcommit
=
SDB_ENDCOMMIT
;
off_t
offset
=
lseek
(
pTable
->
fd
,
0
,
SEEK_END
);
off_t
offset
=
lseek
(
pTable
->
fd
,
0
,
SEEK_END
);
assert
(
offset
==
pTable
->
s
ize
);
assert
(
offset
==
pTable
->
fileS
ize
);
twrite
(
pTable
->
fd
,
&
sdbEcommit
,
sizeof
(
sdbEcommit
));
twrite
(
pTable
->
fd
,
&
sdbEcommit
,
sizeof
(
sdbEcommit
));
pTable
->
s
ize
+=
sizeof
(
sdbEcommit
);
pTable
->
fileS
ize
+=
sizeof
(
sdbEcommit
);
}
}
static
int32_t
sdbOpenSdbFile
(
SSdbTable
*
pTable
)
{
static
int32_t
sdbOpenSdbFile
(
SSdbTable
*
pTable
)
{
...
@@ -136,40 +154,40 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) {
...
@@ -136,40 +154,40 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) {
// check sdb.db and .sdb.db status
// check sdb.db and .sdb.db status
char
fn
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
char
fn
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
dirc
=
strdup
(
pTable
->
f
n
);
dirc
=
strdup
(
pTable
->
f
ileName
);
basec
=
strdup
(
pTable
->
f
n
);
basec
=
strdup
(
pTable
->
f
ileName
);
sprintf
(
fn
,
"%s/.%s"
,
dirname
(
dirc
),
basename
(
basec
));
sprintf
(
fn
,
"%s/.%s"
,
dirname
(
dirc
),
basename
(
basec
));
tfree
(
dirc
);
tfree
(
dirc
);
tfree
(
basec
);
tfree
(
basec
);
if
(
stat
(
fn
,
&
ofstat
)
==
0
)
{
// .sdb.db file exists
if
(
stat
(
fn
,
&
ofstat
)
==
0
)
{
// .sdb.db file exists
if
(
stat
(
pTable
->
f
n
,
&
fstat
)
==
0
)
{
if
(
stat
(
pTable
->
f
ileName
,
&
fstat
)
==
0
)
{
remove
(
fn
);
remove
(
fn
);
}
else
{
}
else
{
remove
(
pTable
->
f
n
);
remove
(
pTable
->
f
ileName
);
rename
(
fn
,
pTable
->
f
n
);
rename
(
fn
,
pTable
->
f
ileName
);
}
}
}
}
pTable
->
fd
=
open
(
pTable
->
f
n
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
pTable
->
fd
=
open
(
pTable
->
f
ileName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
pTable
->
fd
<
0
)
{
if
(
pTable
->
fd
<
0
)
{
sdbError
(
"
failed to open file:%s"
,
pTable
->
fn
);
sdbError
(
"
table:%s, failed to open file:%s"
,
pTable
->
tableName
,
pTable
->
fileName
);
return
-
1
;
return
-
1
;
}
}
pTable
->
s
ize
=
0
;
pTable
->
fileS
ize
=
0
;
stat
(
pTable
->
f
n
,
&
fstat
);
stat
(
pTable
->
f
ileName
,
&
fstat
);
size
=
sizeof
(
pTable
->
header
);
size
=
sizeof
(
pTable
->
header
);
if
(
fstat
.
st_size
==
0
)
{
if
(
fstat
.
st_size
==
0
)
{
pTable
->
header
.
swVersion
=
swVersion
.
iversion
;
pTable
->
header
.
swVersion
=
swVersion
.
iversion
;
pTable
->
header
.
sdbFileVersion
=
sdbFileVersion
;
pTable
->
header
.
sdbFileVersion
=
sdbFileVersion
;
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)(
&
pTable
->
header
),
size
)
<
0
)
{
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)(
&
pTable
->
header
),
size
)
<
0
)
{
sdbError
(
"
failed to get file header checksum, file:%s"
,
pTable
->
fn
);
sdbError
(
"
table:%s, failed to get file header checksum, file:%s"
,
pTable
->
tableName
,
pTable
->
fileName
);
tclose
(
pTable
->
fd
);
tclose
(
pTable
->
fd
);
return
-
1
;
return
-
1
;
}
}
twrite
(
pTable
->
fd
,
&
(
pTable
->
header
),
size
);
twrite
(
pTable
->
fd
,
&
(
pTable
->
header
),
size
);
pTable
->
s
ize
+=
size
;
pTable
->
fileS
ize
+=
size
;
sdbFinishCommit
(
pTable
);
sdbFinishCommit
(
pTable
);
}
else
{
}
else
{
uint32_t
sdbEcommit
=
0
;
uint32_t
sdbEcommit
=
0
;
...
@@ -186,25 +204,25 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) {
...
@@ -186,25 +204,25 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) {
ssize_t
tsize
=
read
(
pTable
->
fd
,
&
(
pTable
->
header
),
size
);
ssize_t
tsize
=
read
(
pTable
->
fd
,
&
(
pTable
->
header
),
size
);
if
(
tsize
<
size
)
{
if
(
tsize
<
size
)
{
sdbError
(
"
failed to read sdb file header, file:%s"
,
pTable
->
fn
);
sdbError
(
"
table:%s, failed to read sdb file header, file:%s"
,
pTable
->
tableName
,
pTable
->
fileName
);
tclose
(
pTable
->
fd
);
tclose
(
pTable
->
fd
);
return
-
1
;
return
-
1
;
}
}
if
(
pTable
->
header
.
swVersion
!=
swVersion
.
iversion
)
{
if
(
pTable
->
header
.
swVersion
!=
swVersion
.
iversion
)
{
sdbWarn
(
"
sdb file:%s version not match software version"
,
pTable
->
fn
);
sdbWarn
(
"
table:%s, sdb file:%s version not match software version"
,
pTable
->
tableName
,
pTable
->
fileName
);
}
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
&
pTable
->
header
),
size
))
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)(
&
pTable
->
header
),
size
))
{
sdbError
(
"
sdb file header is broken since checksum mismatch, file:%s"
,
pTable
->
fn
);
sdbError
(
"
table:%s, sdb file header is broken since checksum mismatch, file:%s"
,
pTable
->
tableName
,
pTable
->
fileName
);
tclose
(
pTable
->
fd
);
tclose
(
pTable
->
fd
);
return
-
1
;
return
-
1
;
}
}
pTable
->
s
ize
+=
size
;
pTable
->
fileS
ize
+=
size
;
// skip end commit symbol
// skip end commit symbol
lseek
(
pTable
->
fd
,
sizeof
(
sdbEcommit
),
SEEK_CUR
);
lseek
(
pTable
->
fd
,
sizeof
(
sdbEcommit
),
SEEK_CUR
);
pTable
->
s
ize
+=
sizeof
(
sdbEcommit
);
pTable
->
fileS
ize
+=
sizeof
(
sdbEcommit
);
}
}
pTable
->
numOfRows
=
0
;
pTable
->
numOfRows
=
0
;
...
@@ -213,106 +231,170 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) {
...
@@ -213,106 +231,170 @@ static int32_t sdbOpenSdbFile(SSdbTable *pTable) {
}
}
static
int32_t
sdbInitTableByFile
(
SSdbTable
*
pTable
)
{
static
int32_t
sdbInitTableByFile
(
SSdbTable
*
pTable
)
{
SRowMeta
rowMeta
;
sdbTrace
(
"table:%s, open sdb file:%s for read"
,
pTable
->
tableName
,
pTable
->
fileName
);
int32_t
numOfDels
=
0
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
{
int32_t
bytes
=
0
;
sdbError
(
"table:%s, failed to open sdb file:%s for read"
,
pTable
->
tableName
,
pTable
->
fileName
);
int64_t
oldId
=
0
;
return
-
1
;
void
*
pMetaRow
=
NULL
;
}
int32_t
total_size
=
0
;
int32_t
real_size
=
0
;
int32_t
maxAutoIndex
=
0
;
if
(
sdbOpenSdbFile
(
pTable
)
<
0
)
return
-
1
;
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
int32_t
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
malloc
(
total_size
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
malloc
(
total_size
);
if
(
rowHead
==
NULL
)
{
if
(
rowHead
==
NULL
)
{
sdbError
(
"
failed to allocate row head memory, sdb:%s"
,
pTable
->
n
ame
);
sdbError
(
"
table:%s, failed to allocate row head memory, sdb:%s"
,
pTable
->
tableName
,
pTable
->
tableN
ame
);
return
-
1
;
return
-
1
;
}
}
sdbTrace
(
"open sdb file:%s for read"
,
pTable
->
fn
)
;
int32_t
numOfChanged
=
0
;
int32_t
maxAutoIndex
=
0
;
while
(
1
)
{
while
(
1
)
{
memset
(
rowHead
,
0
,
total_size
);
memset
(
rowHead
,
0
,
total_size
);
bytes
=
read
(
pTable
->
fd
,
rowHead
,
sizeof
(
SRowHead
));
int32_t
bytes
=
read
(
pTable
->
fd
,
rowHead
,
sizeof
(
SRowHead
));
if
(
bytes
<
0
)
{
if
(
bytes
<
0
)
{
sdbError
(
"failed to read sdb file:%s"
,
pTable
->
fn
);
sdbError
(
"table:%s, failed to read sdb file:%s"
,
pTable
->
tableName
,
pTable
->
fileName
);
goto
sdb_exit1
;
tfree
(
rowHead
);
return
-
1
;
}
}
if
(
bytes
==
0
)
break
;
if
(
bytes
==
0
)
break
;
if
(
bytes
<
sizeof
(
SRowHead
)
||
rowHead
->
delimiter
!=
SDB_DELIMITER
)
{
if
(
bytes
<
sizeof
(
SRowHead
)
||
rowHead
->
delimiter
!=
SDB_DELIMITER
)
{
pTable
->
s
ize
++
;
pTable
->
fileS
ize
++
;
lseek
(
pTable
->
fd
,
-
(
bytes
-
1
),
SEEK_CUR
);
lseek
(
pTable
->
fd
,
-
(
bytes
-
1
),
SEEK_CUR
);
continue
;
continue
;
}
}
if
(
rowHead
->
rowSize
<
0
||
rowHead
->
rowSize
>
pTable
->
maxRowSize
)
{
if
(
rowHead
->
rowSize
<
0
||
rowHead
->
rowSize
>
pTable
->
maxRowSize
)
{
sdbError
(
"
error row size in sdb file:%s, id:%d rowSize:%d maxRowSize:%d"
,
sdbError
(
"
table:%s, error row size in sdb filesize:%d, version:%d rowSize:%d maxRowSize:%d"
,
pTable
->
tableName
,
pTable
->
fn
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
maxRowSize
);
pTable
->
fileSize
,
rowHead
->
version
,
rowHead
->
rowSize
,
pTable
->
maxRowSize
);
pTable
->
s
ize
+=
sizeof
(
SRowHead
);
pTable
->
fileS
ize
+=
sizeof
(
SRowHead
);
continue
;
continue
;
}
}
bytes
=
read
(
pTable
->
fd
,
rowHead
->
data
,
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
));
bytes
=
read
(
pTable
->
fd
,
rowHead
->
data
,
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
));
if
(
bytes
<
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
))
{
if
(
bytes
<
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
))
{
// TODO: Here may cause pTable->size not end of the file
// TODO: Here may cause pTable->fileSize not end of the file
sdbError
(
"failed to read sdb file:%s id:%d rowSize:%d"
,
pTable
->
fn
,
rowHead
->
id
,
rowHead
->
rowSize
);
sdbError
(
"table:%s, failed to read sdb file, version:%d rowSize:%d"
,
pTable
->
tableName
,
rowHead
->
version
,
rowHead
->
rowSize
);
break
;
break
;
}
}
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
int32_t
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
rowHead
,
real_size
))
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
rowHead
,
real_size
))
{
sdbError
(
"
error sdb checksum, sdb:%s id:%d, skip"
,
pTable
->
name
,
rowHead
->
id
);
sdbError
(
"
table:%s, error sdb checksum, version:%d, skip"
,
pTable
->
tableName
,
rowHead
->
version
);
pTable
->
s
ize
+=
real_size
;
pTable
->
fileS
ize
+=
real_size
;
continue
;
continue
;
}
}
if
(
pTable
->
keyType
==
SDB_KEYTYPE_AUTO
)
{
if
(
pTable
->
keyType
==
SDB_KEY
_
TYPE_AUTO
)
{
maxAutoIndex
=
MAX
(
maxAutoIndex
,
*
(
int32_t
*
)
rowHead
->
data
);
maxAutoIndex
=
MAX
(
maxAutoIndex
,
*
(
int32_t
*
)
rowHead
->
data
);
}
}
pMetaRow
=
sdbGetRow
(
pTable
,
rowHead
->
data
);
void
*
pMetaRow
=
sdbGetRow
(
pTable
,
rowHead
->
data
);
if
(
pMetaRow
==
NULL
)
{
if
(
pMetaRow
==
NULL
)
{
if
(
rowHead
->
id
<
0
)
{
if
(
rowHead
->
version
<
0
)
{
sdbError
(
"error sdb negative id:%d, sdb:%s, skip"
,
rowHead
->
id
,
pTable
->
name
);
sdbError
(
"table:%s, error sdb negative version:%d, record:%s, skip"
,
pTable
->
tableName
,
rowHead
->
version
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
));
}
else
{
SRowMeta
rowMeta
;
rowMeta
.
version
=
rowHead
->
version
;
rowMeta
.
offset
=
pTable
->
fileSize
;
rowMeta
.
rowSize
=
rowHead
->
rowSize
;
SSdbOperDesc
oper
=
{
.
table
=
pTable
,
.
rowData
=
rowHead
->
data
,
.
rowSize
=
rowHead
->
rowSize
};
int32_t
code
=
(
*
pTable
->
decodeFp
)(
&
oper
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
rowMeta
.
row
=
oper
.
pObj
;
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowMeta
.
row
,
&
rowMeta
);
pTable
->
numOfRows
++
;
sdbTrace
(
"table:%s, read record:%s and insert, numOfRows:%d version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
),
pTable
->
numOfRows
,
pTable
->
version
,
sdbVersion
);
}
else
{
}
else
{
sdbInsertRow
(
pTable
,
rowHead
->
data
,
SDB_OPER_DISK
);
sdbError
(
"table:%s, failed to decode record:%s, numOfRows:%d version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
),
pTable
->
numOfRows
,
pTable
->
version
,
sdbVersion
);
}
}
}
}
else
{
if
(
rowHead
->
version
<
0
)
{
SSdbOperDesc
oper
=
{
.
table
=
pTable
,
.
pObj
=
pMetaRow
};
(
*
pTable
->
destroyFp
)(
&
oper
);
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowHead
->
data
);
pTable
->
numOfRows
--
;
sdbTrace
(
"table:%s, read record:%s and delete, numOfRows:%d version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
),
pTable
->
numOfRows
,
pTable
->
version
,
sdbVersion
);
}
else
{
}
else
{
if
(
rowHead
->
id
<
0
)
{
SRowMeta
rowMeta
;
sdbDeleteRow
(
pTable
,
rowHead
->
data
,
SDB_OPER_DISK
);
rowMeta
.
version
=
rowHead
->
version
;
rowMeta
.
offset
=
pTable
->
fileSize
;
rowMeta
.
rowSize
=
rowHead
->
rowSize
;
SSdbOperDesc
oper
=
{
.
table
=
pTable
,
.
rowData
=
rowHead
->
data
,
.
rowSize
=
rowHead
->
rowSize
,
.
pObj
=
pMetaRow
};
(
*
pTable
->
destroyFp
)(
&
oper
);
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowHead
->
data
);
int32_t
code
=
(
*
pTable
->
decodeFp
)(
&
oper
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
rowMeta
.
row
=
oper
.
pObj
;
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
rowMeta
.
row
,
&
rowMeta
);
sdbTrace
(
"table:%s, read record:%s and update, numOfRows:%d version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
),
pTable
->
numOfRows
,
pTable
->
version
,
sdbVersion
);
}
else
{
}
else
{
sdbUpdateRow
(
pTable
,
rowHead
->
data
,
rowHead
->
rowSize
,
SDB_OPER_DISK
);
sdbError
(
"table:%s, failed to decode record:%s, numOfRows:%d version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
),
pTable
->
numOfRows
,
pTable
->
version
,
sdbVersion
);
}
}
numOfChanged
++
;
if
(
pTable
->
version
<
abs
(
rowHead
->
version
))
{
pTable
->
version
=
abs
(
rowHead
->
version
);
}
}
numOfDels
++
;
}
}
pTable
->
size
+=
real_size
;
pTable
->
fileSize
+=
real_size
;
if
(
pTable
->
id
<
abs
(
rowHead
->
id
))
pTable
->
id
=
abs
(
rowHead
->
id
);
pTable
->
fileSize
+=
4
;
pTable
->
size
+=
4
;
lseek
(
pTable
->
fd
,
4
,
SEEK_CUR
);
lseek
(
pTable
->
fd
,
4
,
SEEK_CUR
);
}
}
if
(
pTable
->
keyType
==
SDB_KEYTYPE_AUTO
)
{
void
*
pNode
=
NULL
;
while
(
1
)
{
SRowMeta
*
pMeta
;
pNode
=
(
*
sdbFetchRowFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pNode
,
(
void
**
)
&
pMeta
);
if
(
pMeta
==
NULL
)
break
;
SSdbOperDesc
oper
=
{
.
pObj
=
pMeta
->
row
,
.
table
=
pTable
,
.
version
=
pMeta
->
version
,
};
int32_t
code
=
(
*
pTable
->
insertFp
)(
&
oper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
sdbError
(
"table:%s, failed to insert record:%s"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
rowHead
->
data
));
}
}
sdbVersion
+=
pTable
->
version
;
if
(
pTable
->
keyType
==
SDB_KEY_TYPE_AUTO
)
{
pTable
->
autoIndex
=
maxAutoIndex
;
pTable
->
autoIndex
=
maxAutoIndex
;
}
}
sdbVersion
+=
pTable
->
id
;
if
(
numOfChanged
>
pTable
->
hashSessions
/
4
)
{
if
(
numOfDels
>
pTable
->
hashSessions
/
4
)
{
sdbSaveSnapShot
(
pTable
);
sdbSaveSnapShot
(
pTable
);
}
}
tfree
(
rowHead
);
tfree
(
rowHead
);
return
0
;
return
0
;
sdb_exit1:
tfree
(
rowHead
);
return
-
1
;
}
}
void
*
sdbOpenTable
(
SSdbTableDesc
*
pDesc
)
{
void
*
sdbOpenTable
(
SSdbTableDesc
*
pDesc
)
{
...
@@ -328,8 +410,8 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
...
@@ -328,8 +410,8 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
pTable
->
encodeFp
=
pDesc
->
encodeFp
;
pTable
->
encodeFp
=
pDesc
->
encodeFp
;
pTable
->
decodeFp
=
pDesc
->
decodeFp
;
pTable
->
decodeFp
=
pDesc
->
decodeFp
;
pTable
->
destroyFp
=
pDesc
->
destroyFp
;
pTable
->
destroyFp
=
pDesc
->
destroyFp
;
strcpy
(
pTable
->
n
ame
,
pDesc
->
tableName
);
strcpy
(
pTable
->
tableN
ame
,
pDesc
->
tableName
);
sprintf
(
pTable
->
f
n
,
"%s/%s.db"
,
tsMnodeDir
,
pTable
->
n
ame
);
sprintf
(
pTable
->
f
ileName
,
"%s/%s.db"
,
tsMnodeDir
,
pTable
->
tableN
ame
);
if
(
sdbInitIndexFp
[
pTable
->
keyType
]
!=
NULL
)
{
if
(
sdbInitIndexFp
[
pTable
->
keyType
]
!=
NULL
)
{
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
pTable
->
keyType
])(
pTable
->
maxRowSize
,
sizeof
(
SRowMeta
));
pTable
->
iHandle
=
(
*
sdbInitIndexFp
[
pTable
->
keyType
])(
pTable
->
maxRowSize
,
sizeof
(
SRowMeta
));
...
@@ -339,10 +421,11 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
...
@@ -339,10 +421,11 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if
(
sdbInitTableByFile
(
pTable
)
<
0
)
return
NULL
;
if
(
sdbInitTableByFile
(
pTable
)
<
0
)
return
NULL
;
pTable
->
db
Id
=
sdbNumOfTables
++
;
pTable
->
table
Id
=
sdbNumOfTables
++
;
sdbTableList
[
pTable
->
db
Id
]
=
pTable
;
sdbTableList
[
pTable
->
table
Id
]
=
pTable
;
sdbTrace
(
"table:%s is initialized, numOfRows:%d, numOfTables:%d"
,
pTable
->
name
,
pTable
->
numOfRows
,
sdbNumOfTables
);
sdbTrace
(
"table:%s is initialized, numOfRows:%d, numOfTables:%d, version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
pTable
->
numOfRows
,
sdbNumOfTables
,
pTable
->
version
,
sdbVersion
);
return
pTable
;
return
pTable
;
}
}
...
@@ -375,323 +458,267 @@ void *sdbGetRow(void *handle, void *key) {
...
@@ -375,323 +458,267 @@ void *sdbGetRow(void *handle, void *key) {
return
pMeta
->
row
;
return
pMeta
->
row
;
}
}
int32_t
sdbInsertRow
(
void
*
handle
,
void
*
row
,
ESdbOperType
oper
)
{
int32_t
sdbInsertRow
(
SSdbOperDesc
*
pOper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
SRowMeta
rowMeta
;
void
*
pObj
=
NULL
;
int32_t
total_size
=
0
;
int32_t
real_size
=
0
;
if
(
pTable
==
NULL
)
{
if
(
pTable
==
NULL
)
{
sdbError
(
"sdb tables is null"
);
sdbError
(
"sdb tables is null"
);
return
-
1
;
return
TSDB_CODE_OTHERS
;
}
}
if
(
sdbGetRow
(
handle
,
row
))
{
if
(
sdbGetRow
(
pTable
,
pOper
->
pObj
))
{
switch
(
pTable
->
keyType
)
{
sdbError
(
"table:%s, failed to insert record:%s, already exist"
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
));
case
SDB_KEYTYPE_STRING
:
return
TSDB_CODE_ALREADY_THERE
;
sdbError
(
"table:%s, failed to insert record:%s sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
case
SDB_KEYTYPE_AUTO
:
sdbError
(
"table:%s, failed to insert record:%d sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
default:
sdbError
(
"table:%s, failed to insert record sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
);
break
;
}
return
-
1
;
}
}
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
pthread_mutex_lock
(
&
pTable
->
mutex
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
malloc
(
total_size
);
if
(
rowHead
==
NULL
)
{
sdbError
(
"table:%s, failed to allocate row head memory"
,
pTable
->
name
);
return
-
1
;
}
memset
(
rowHead
,
0
,
total_size
);
if
(
oper
==
SDB_OPER_GLOBAL
)
{
if
(
pOper
->
type
==
SDB_OPER_TYPE_GLOBAL
)
{
pObj
=
row
;
SForwardMsg
forward
=
{
}
else
{
.
type
=
SDB_FORWARD_TYPE_INSERT
,
pObj
=
(
*
pTable
->
decodeFp
)(
row
);
.
tableId
=
pTable
->
tableId
,
.
version
=
pTable
->
version
+
1
,
.
rowSize
=
pOper
->
rowSize
,
.
rowData
=
pOper
->
rowData
,
};
if
(
sdbForwardDbReqToPeer
(
&
forward
)
!=
0
)
{
sdbError
(
"table:%s, failed to forward record:%s version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
pOper
->
version
,
sdbVersion
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
return
TSDB_CODE_OTHERS
;
}
}
}
pthread_mutex_lock
(
&
pTable
->
mutex
);
int32_t
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
calloc
(
1
,
total_size
);
if
(
oper
==
SDB_OPER_GLOBAL
)
{
if
(
rowHead
==
NULL
)
{
if
(
sdbForwardDbReqToPeer
(
pTable
,
SDB_TYPE_INSERT
,
rowHead
->
data
,
rowHead
->
rowSize
)
!=
0
)
{
sdbError
(
"table:%s, failed to insert record"
,
pTable
->
name
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
sdbError
(
"table:%s, failed to allocate row head memory for record:%s version:%"
PRId64
" sdbversion:%"
PRId64
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
pOper
->
version
,
sdbVersion
);
return
-
1
;
return
-
1
;
}
}
if
(
pTable
->
keyType
==
SDB_KEY_TYPE_AUTO
)
{
*
((
uint32_t
*
)
pOper
->
pObj
)
=
++
pTable
->
autoIndex
;
}
}
pTable
->
version
++
;
sdbVersion
++
;
pOper
->
rowData
=
rowHead
->
data
;
(
*
pTable
->
encodeFp
)(
pOper
);
rowHead
->
rowSize
=
pOper
->
rowSize
;
if
(
oper
!=
SDB_OPER_DISK
)
{
rowHead
->
rowSize
=
(
*
pTable
->
encodeFp
)(
pObj
,
rowHead
->
data
,
pTable
->
maxRowSize
);
assert
(
rowHead
->
rowSize
>
0
&&
rowHead
->
rowSize
<=
pTable
->
maxRowSize
);
assert
(
rowHead
->
rowSize
>
0
&&
rowHead
->
rowSize
<=
pTable
->
maxRowSize
);
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
int32_t
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
id
=
pTable
->
id
+
1
;
rowHead
->
version
=
pTable
->
version
;
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
real_size
)
<
0
)
{
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
real_size
)
<
0
)
{
sdbError
(
"table:%s, failed to get checksum while inserting"
,
pTable
->
name
);
sdbError
(
"table:%s, failed to get checksum while inserting"
,
pTable
->
tableName
);
pTable
->
version
--
;
sdbVersion
--
;
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
tfree
(
rowHead
);
return
-
1
;
return
-
1
;
}
}
twrite
(
pTable
->
fd
,
rowHead
,
real_size
);
twrite
(
pTable
->
fd
,
rowHead
,
real_size
);
pTable
->
s
ize
+=
real_size
;
pTable
->
fileS
ize
+=
real_size
;
sdbFinishCommit
(
pTable
);
sdbFinishCommit
(
pTable
);
}
tfree
(
rowHead
);
// update in SDB layer
// update in SDB layer
rowMeta
.
id
=
pTable
->
id
;
SRowMeta
rowMeta
;
rowMeta
.
offset
=
pTable
->
size
;
rowMeta
.
version
=
pTable
->
version
;
rowMeta
.
rowSize
=
rowHead
->
rowSize
;
rowMeta
.
offset
=
pTable
->
fileSize
;
rowMeta
.
row
=
pObj
;
rowMeta
.
rowSize
=
pOper
->
rowSize
;
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pObj
,
&
rowMeta
);
rowMeta
.
row
=
pOper
->
pObj
;
(
*
sdbAddIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pOper
->
pObj
,
&
rowMeta
);
if
(
pTable
->
keyType
==
SDB_KEYTYPE_AUTO
)
{
*
((
uint32_t
*
)
pObj
)
=
++
pTable
->
autoIndex
;
}
pTable
->
numOfRows
++
;
pTable
->
numOfRows
++
;
if
(
oper
!=
SDB_OPER_DISK
)
{
pTable
->
id
++
;
sdbVersion
++
;
}
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
switch
(
pTable
->
keyType
)
{
sdbTrace
(
"table:%s, a record is inserted:%s, sdbversion:%"
PRId64
" version:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
case
SDB_KEYTYPE_STRING
:
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
pObj
),
sdbVersion
,
pOper
->
version
,
pOper
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
fileSize
);
sdbTrace
(
"table:%s, a record is inserted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
break
;
case
SDB_KEYTYPE_AUTO
:
sdbTrace
(
"table:%s, a record is inserted:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
break
;
default:
sdbTrace
(
"table:%s, a record is inserted, sdbVersion:%"
PRId64
" id:%"
PRId64
" rowSize:%d numOfRows:%d fileSize:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
numOfRows
,
pTable
->
size
);
break
;
}
(
*
pTable
->
insertFp
)(
pObj
);
tfree
(
rowHead
);
(
*
pTable
->
insertFp
)(
pOper
);
return
0
;
return
0
;
}
}
// row here can be object or null-terminated string
// row here can be object or null-terminated string
int32_t
sdbDeleteRow
(
void
*
handle
,
void
*
row
,
ESdbOperType
oper
)
{
int32_t
sdbDeleteRow
(
SSdbOperDesc
*
pOper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
SRowMeta
*
pMeta
=
NULL
;
void
*
pMetaRow
=
NULL
;
SRowHead
*
rowHead
=
NULL
;
int32_t
rowSize
=
0
;
int32_t
total_size
=
0
;
if
(
pTable
==
NULL
)
return
-
1
;
if
(
pTable
==
NULL
)
return
-
1
;
pMeta
=
sdbGetRowMeta
(
handle
,
row
);
SRowMeta
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
pObj
);
if
(
pMeta
==
NULL
)
{
if
(
pMeta
==
NULL
)
{
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
n
ame
);
sdbTrace
(
"table:%s, record is not there, delete failed"
,
pTable
->
tableN
ame
);
return
-
1
;
return
-
1
;
}
}
pMetaRow
=
pMeta
->
row
;
void
*
pMetaRow
=
pMeta
->
row
;
assert
(
pMetaRow
!=
NULL
);
assert
(
pMetaRow
!=
NULL
);
switch
(
pTable
->
keyType
)
{
pthread_mutex_lock
(
&
pTable
->
mutex
);
case
SDB_KEYTYPE_STRING
:
rowSize
=
strlen
((
char
*
)
row
)
+
1
;
if
(
pOper
->
type
==
SDB_OPER_TYPE_GLOBAL
)
{
break
;
SForwardMsg
forward
=
{
case
SDB_KEYTYPE_AUTO
:
.
type
=
SDB_FORWARD_TYPE_DELETE
,
rowSize
=
sizeof
(
uint64_t
);
.
tableId
=
pTable
->
tableId
,
break
;
.
version
=
pTable
->
version
+
1
,
default:
.
rowSize
=
pOper
->
rowSize
,
.
rowData
=
pOper
->
rowData
,
};
if
(
sdbForwardDbReqToPeer
(
&
forward
)
==
0
)
{
sdbError
(
"table:%s, failed to delete record"
,
pTable
->
tableName
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
return
-
1
;
return
-
1
;
}
}
}
total_size
=
sizeof
(
SRowHead
)
+
rowSize
+
sizeof
(
TSCKSUM
);
int32_t
total_size
=
sizeof
(
SRowHead
)
+
pOper
->
rowSize
+
sizeof
(
TSCKSUM
);
rowHead
=
(
SRowHead
*
)
malloc
(
total_size
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
calloc
(
1
,
total_size
);
if
(
rowHead
==
NULL
)
{
if
(
rowHead
==
NULL
)
{
sdbError
(
"failed to allocate row head memory, sdb:%s"
,
pTable
->
name
);
sdbError
(
"failed to allocate row head memory, sdb:%s"
,
pTable
->
tableName
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
return
-
1
;
return
-
1
;
}
}
memset
(
rowHead
,
0
,
total_size
);
pthread_mutex_lock
(
&
pTable
->
mutex
);
pTable
->
version
++
;
sdbVersion
++
;
if
(
oper
==
SDB_OPER_GLOBAL
)
{
int32_t
rowSize
=
0
;
if
(
sdbForwardDbReqToPeer
(
pTable
,
SDB_TYPE_DELETE
,
(
char
*
)
row
,
rowSize
)
==
0
)
{
switch
(
pTable
->
keyType
)
{
sdbError
(
"table:%s, failed to delete record"
,
pTable
->
name
);
case
SDB_KEY_TYPE_STRING
:
pthread_mutex_unlock
(
&
pTable
->
mutex
);
rowSize
=
strlen
((
char
*
)
pOper
->
rowData
)
+
1
;
tfree
(
rowHead
);
break
;
case
SDB_KEY_TYPE_AUTO
:
rowSize
=
sizeof
(
uint64_t
);
break
;
default:
return
-
1
;
return
-
1
;
}
}
}
if
(
oper
!=
SDB_OPER_DISK
)
{
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
rowSize
=
rowSize
;
rowHead
->
rowSize
=
rowSize
;
rowHead
->
id
=
-
(
pTable
->
id
);
rowHead
->
version
=
-
(
pTable
->
version
);
memcpy
(
rowHead
->
data
,
row
,
rowSize
);
memcpy
(
rowHead
->
data
,
pOper
->
rowData
,
pOper
->
rowSize
);
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
total_size
)
<
0
)
{
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
total_size
)
<
0
)
{
sdbError
(
"failed to get checksum while inserting, sdb:%s"
,
pTable
->
name
);
sdbError
(
"failed to get checksum while inserting, sdb:%s"
,
pTable
->
tableName
);
pTable
->
version
--
;
sdbVersion
--
;
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
tfree
(
rowHead
);
return
-
1
;
return
-
1
;
}
}
twrite
(
pTable
->
fd
,
rowHead
,
total_size
);
twrite
(
pTable
->
fd
,
rowHead
,
total_size
);
pTable
->
s
ize
+=
total_size
;
pTable
->
fileS
ize
+=
total_size
;
sdbFinishCommit
(
pTable
);
sdbFinishCommit
(
pTable
);
}
switch
(
pTable
->
keyType
)
{
tfree
(
rowHead
);
case
SDB_KEYTYPE_STRING
:
sdbTrace
(
"table:%s, a record is deleted:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
sdbTrace
(
"table:%s, a record is deleted:%s, sdbversion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
rowData
),
sdbVersion
,
pTable
->
version
,
pTable
->
numOfRows
);
break
;
case
SDB_KEYTYPE_AUTO
:
sdbTrace
(
"table:%s, a record is deleted:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
default:
sdbTrace
(
"table:%s, a record is deleted, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%d"
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
}
// Delete from current layer
// Delete from current layer
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
row
);
(
*
sdbDeleteIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
,
pOper
->
pObj
);
pTable
->
numOfRows
--
;
pTable
->
numOfRows
--
;
if
(
oper
!=
SDB_OPER_DISK
)
{
pTable
->
id
++
;
sdbVersion
++
;
}
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
(
*
pTable
->
deleteFp
)(
pOper
);
(
*
pTable
->
destroyFp
)(
pOper
);
(
*
pTable
->
deleteFp
)(
pMetaRow
);
(
*
pTable
->
destroyFp
)(
pMetaRow
);
return
0
;
return
0
;
}
}
// row here can be the object or the string info (encoded string)
// row here can be the object or the string info (encoded string)
int32_t
sdbUpdateRow
(
void
*
handle
,
void
*
row
,
int32_t
updateSize
,
ESdbOperType
oper
)
{
int32_t
sdbUpdateRow
(
SSdbOperDesc
*
pOper
)
{
SSdbTable
*
pTable
=
(
SSdbTable
*
)
handle
;
SSdbTable
*
pTable
=
(
SSdbTable
*
)
pOper
->
table
;
SRowMeta
*
pMeta
=
NULL
;
if
(
pTable
==
NULL
)
return
-
1
;
int32_t
total_size
=
0
;
int32_t
real_size
=
0
;
if
(
pTable
==
NULL
||
row
==
NULL
)
return
-
1
;
SRowMeta
*
pMeta
=
sdbGetRowMeta
(
pTable
,
pOper
->
rowData
);
pMeta
=
sdbGetRowMeta
(
handle
,
row
);
if
(
pMeta
==
NULL
)
{
if
(
pMeta
==
NULL
)
{
switch
(
pTable
->
keyType
)
{
sdbError
(
"table:%s, failed to update record:%s, record is not there, sdbversion:%"
PRId64
" id:%"
PRId64
,
case
SDB_KEYTYPE_STRING
:
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
rowData
),
sdbVersion
,
pTable
->
version
);
sdbError
(
"table:%s, failed to update record:%s, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
case
SDB_KEYTYPE_AUTO
:
sdbError
(
"table:%s, failed to update record:%d, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
);
break
;
default:
sdbError
(
"table:%s, failed to update record, record is not there, sdbVersion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
);
break
;
}
return
-
1
;
return
-
1
;
}
}
void
*
pMetaRow
=
pMeta
->
row
;
void
*
pMetaRow
=
pMeta
->
row
;
assert
(
pMetaRow
!=
NULL
);
assert
(
pMetaRow
!=
NULL
);
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
malloc
(
total_size
);
if
(
rowHead
==
NULL
)
{
sdbError
(
"failed to allocate row head memory, sdb:%s"
,
pTable
->
name
);
return
-
1
;
}
memset
(
rowHead
,
0
,
total_size
);
pthread_mutex_lock
(
&
pTable
->
mutex
);
pthread_mutex_lock
(
&
pTable
->
mutex
);
if
(
oper
==
SDB_OPER_GLOBAL
)
{
if
(
pOper
->
type
==
SDB_OPER_TYPE_GLOBAL
)
{
if
(
sdbForwardDbReqToPeer
(
pTable
,
SDB_TYPE_UPDATE
,
rowHead
->
data
,
rowHead
->
rowSize
)
==
0
)
{
SForwardMsg
forward
=
{
sdbError
(
"table:%s, failed to update record"
,
pTable
->
name
);
.
type
=
SDB_FORWARD_TYPE_UPDATE
,
.
tableId
=
pTable
->
tableId
,
.
version
=
pOper
->
version
+
1
,
.
rowSize
=
pOper
->
rowSize
,
.
rowData
=
pOper
->
rowData
,
};
if
(
sdbForwardDbReqToPeer
(
&
forward
)
==
0
)
{
sdbError
(
"table:%s, failed to update record"
,
pTable
->
tableName
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
return
-
1
;
return
-
1
;
}
}
}
}
if
(
pMetaRow
!=
row
)
{
int32_t
total_size
=
sizeof
(
SRowHead
)
+
pTable
->
maxRowSize
+
sizeof
(
TSCKSUM
);
memcpy
(
rowHead
->
data
,
row
,
updateSize
);
SRowHead
*
rowHead
=
(
SRowHead
*
)
calloc
(
1
,
total_size
);
rowHead
->
rowSize
=
updateSize
;
if
(
rowHead
==
NULL
)
{
sdbError
(
"table:%s, failed to allocate row head memory"
,
pTable
->
tableName
);
return
-
1
;
}
if
(
pMetaRow
!=
pOper
->
pObj
)
{
memcpy
(
rowHead
->
data
,
pOper
->
rowData
,
pOper
->
rowSize
);
rowHead
->
rowSize
=
pOper
->
rowSize
;
}
else
{
}
else
{
rowHead
->
rowSize
=
(
*
pTable
->
encodeFp
)(
pMetaRow
,
rowHead
->
data
,
pTable
->
maxRowSize
);
SSdbOperDesc
oper
=
{
.
table
=
pTable
,
.
rowData
=
rowHead
->
data
,
.
maxRowSize
=
pTable
->
maxRowSize
,
.
pObj
=
pOper
->
pObj
};
(
*
pTable
->
encodeFp
)(
&
oper
);
rowHead
->
rowSize
=
oper
.
rowSize
;
}
}
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
pTable
->
version
++
;
sdbVersion
++
;
// write to the new position
int32_t
real_size
=
sizeof
(
SRowHead
)
+
rowHead
->
rowSize
+
sizeof
(
TSCKSUM
);
if
(
oper
!=
SDB_OPER_DISK
)
{
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
delimiter
=
SDB_DELIMITER
;
rowHead
->
id
=
pTable
->
id
;
rowHead
->
version
=
pTable
->
version
;
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
real_size
)
<
0
)
{
if
(
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
rowHead
,
real_size
)
<
0
)
{
sdbError
(
"failed to get checksum, sdb:%s id:%d"
,
pTable
->
name
,
rowHead
->
id
);
sdbError
(
"failed to get checksum, sdb:%s version:%d"
,
pTable
->
tableName
,
rowHead
->
version
);
pTable
->
version
--
;
sdbVersion
--
;
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
tfree
(
rowHead
);
tfree
(
rowHead
);
return
-
1
;
return
-
1
;
}
}
twrite
(
pTable
->
fd
,
rowHead
,
real_size
);
twrite
(
pTable
->
fd
,
rowHead
,
real_size
);
pTable
->
fileSize
+=
real_size
;
pMeta
->
id
=
pTable
->
id
;
pMeta
->
offset
=
pTable
->
size
;
pMeta
->
rowSize
=
rowHead
->
rowSize
;
pTable
->
size
+=
real_size
;
sdbFinishCommit
(
pTable
);
sdbFinishCommit
(
pTable
);
}
switch
(
pTable
->
keyType
)
{
sdbTrace
(
"table:%s, a record is updated:%s, sdbversion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
case
SDB_KEYTYPE_STRING
:
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pOper
->
rowData
),
sdbVersion
,
pTable
->
version
,
pTable
->
numOfRows
);
sdbTrace
(
"table:%s, a record is updated:%s, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
pTable
->
name
,
(
char
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
case
SDB_KEYTYPE_AUTO
:
sdbTrace
(
"table:%s, a record is updated:%d, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
pTable
->
name
,
*
(
int32_t
*
)
row
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
default:
sdbTrace
(
"table:%s, a record is updated, sdbVersion:%"
PRId64
" id:%"
PRId64
" numOfRows:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
,
pTable
->
numOfRows
);
break
;
}
if
(
oper
!=
SDB_OPER_DISK
)
{
pMeta
->
version
=
pTable
->
version
;
pTable
->
id
++
;
pMeta
->
offset
=
pTable
->
fileSize
;
sdbVersion
++
;
pMeta
->
rowSize
=
rowHead
->
rowSize
;
}
pthread_mutex_unlock
(
&
pTable
->
mutex
);
pthread_mutex_unlock
(
&
pTable
->
mutex
);
(
*
pTable
->
updateFp
)(
p
MetaRow
);
// update in upper layer
(
*
pTable
->
updateFp
)(
p
Oper
);
// update in upper layer
tfree
(
rowHead
);
tfree
(
rowHead
);
...
@@ -708,7 +735,12 @@ void sdbCloseTable(void *handle) {
...
@@ -708,7 +735,12 @@ void sdbCloseTable(void *handle) {
while
(
1
)
{
while
(
1
)
{
pNode
=
sdbFetchRow
(
handle
,
pNode
,
&
row
);
pNode
=
sdbFetchRow
(
handle
,
pNode
,
&
row
);
if
(
row
==
NULL
)
break
;
if
(
row
==
NULL
)
break
;
(
*
pTable
->
destroyFp
)(
row
);
SSdbOperDesc
oper
=
{
.
table
=
pTable
,
.
rowData
=
row
,
};
(
*
pTable
->
destroyFp
)(
&
oper
);
}
}
if
(
sdbCleanUpIndexFp
[
pTable
->
keyType
])
(
*
sdbCleanUpIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
);
if
(
sdbCleanUpIndexFp
[
pTable
->
keyType
])
(
*
sdbCleanUpIndexFp
[
pTable
->
keyType
])(
pTable
->
iHandle
);
...
@@ -718,20 +750,20 @@ void sdbCloseTable(void *handle) {
...
@@ -718,20 +750,20 @@ void sdbCloseTable(void *handle) {
pthread_mutex_destroy
(
&
pTable
->
mutex
);
pthread_mutex_destroy
(
&
pTable
->
mutex
);
sdbNumOfTables
--
;
sdbNumOfTables
--
;
sdbTrace
(
"table:%s is closed, id:%"
PRId64
" numOfTables:%d"
,
pTable
->
name
,
pTable
->
id
,
sdbNumOfTables
);
sdbTrace
(
"table:%s is closed, id:%"
PRId64
" numOfTables:%d"
,
pTable
->
tableName
,
pTable
->
version
,
sdbNumOfTables
);
tfree
(
pTable
);
tfree
(
pTable
);
}
}
void
sdbResetTable
(
SSdbTable
*
pTable
)
{
void
sdbResetTable
(
SSdbTable
*
pTable
)
{
/*
SRowHead rowHead; */
/*
SRowMeta rowMeta;
SRowMeta rowMeta;
int32_t bytes;
int32_t bytes;
int32_t total_size = 0;
int32_t total_size = 0;
int32_t real_size = 0;
int32_t real_size = 0;
SRowHead *rowHead = NULL;
SRowHead *rowHead = NULL;
void * pMetaRow = NULL;
void * pMetaRow = NULL;
int64_t
oldId
=
pTable
->
id
;
int64_t oldId = pTable->
version
;
int32_t oldNumOfRows = pTable->numOfRows;
int32_t oldNumOfRows = pTable->numOfRows;
if (sdbOpenSdbFile(pTable) < 0) return;
if (sdbOpenSdbFile(pTable) < 0) return;
...
@@ -740,18 +772,18 @@ void sdbResetTable(SSdbTable *pTable) {
...
@@ -740,18 +772,18 @@ void sdbResetTable(SSdbTable *pTable) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
rowHead = (SRowHead *)malloc(total_size);
rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
if (rowHead == NULL) {
sdbError
(
"failed to allocate row head memory for reset, sdb:%s"
,
pTable
->
n
ame
);
sdbError("failed to allocate row head memory for reset, sdb:%s", pTable->
tableN
ame);
return;
return;
}
}
sdbPrint
(
"open sdb file:%s for reset table"
,
pTable
->
f
n
);
sdbPrint("open sdb file:%s for reset table", pTable->f
ileName
);
while (1) {
while (1) {
memset(rowHead, 0, total_size);
memset(rowHead, 0, total_size);
bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
bytes = read(pTable->fd, rowHead, sizeof(SRowHead));
if (bytes < 0) {
if (bytes < 0) {
sdbError
(
"failed to read sdb file:%s"
,
pTable
->
f
n
);
sdbError("failed to read sdb file:%s", pTable->f
ileName
);
tfree(rowHead);
tfree(rowHead);
return;
return;
}
}
...
@@ -759,40 +791,40 @@ void sdbResetTable(SSdbTable *pTable) {
...
@@ -759,40 +791,40 @@ void sdbResetTable(SSdbTable *pTable) {
if (bytes == 0) break;
if (bytes == 0) break;
if (bytes < sizeof(SRowHead) || rowHead->delimiter != SDB_DELIMITER) {
if (bytes < sizeof(SRowHead) || rowHead->delimiter != SDB_DELIMITER) {
pTable
->
s
ize
++
;
pTable->
fileS
ize++;
lseek(pTable->fd, -(bytes - 1), SEEK_CUR);
lseek(pTable->fd, -(bytes - 1), SEEK_CUR);
continue;
continue;
}
}
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
if (rowHead->rowSize < 0 || rowHead->rowSize > pTable->maxRowSize) {
sdbError
(
"error row size in sdb file:%s for reset,
id
:%d rowSize:%d maxRowSize:%d"
,
sdbError("error row size in sdb file:%s for reset,
version
:%d rowSize:%d maxRowSize:%d",
pTable
->
f
n
,
rowHead
->
id
,
rowHead
->
rowSize
,
pTable
->
maxRowSize
);
pTable->f
ileName, rowHead->version
, rowHead->rowSize, pTable->maxRowSize);
pTable
->
s
ize
+=
sizeof
(
SRowHead
);
pTable->
fileS
ize += sizeof(SRowHead);
continue;
continue;
}
}
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
bytes = read(pTable->fd, rowHead->data, rowHead->rowSize + sizeof(TSCKSUM));
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
if (bytes < rowHead->rowSize + sizeof(TSCKSUM)) {
sdbError
(
"failed to read sdb file:%s for reset,
id:%d rowSize:%d"
,
pTable
->
fn
,
rowHead
->
id
,
rowHead
->
rowSize
);
sdbError("failed to read sdb file:%s for reset,
version:%d rowSize:%d", pTable->fileName, rowHead->version
, rowHead->rowSize);
break;
break;
}
}
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
if (!taosCheckChecksumWhole((uint8_t *)rowHead, real_size)) {
sdbError
(
"error sdb checksum, sdb:%s
id:%d, skip"
,
pTable
->
name
,
rowHead
->
id
);
sdbError("error sdb checksum, sdb:%s
version:%d, skip", pTable->tableName, rowHead->version
);
pTable
->
s
ize
+=
real_size
;
pTable->
fileS
ize += real_size;
continue;
continue;
}
}
if
(
abs
(
rowHead
->
id
)
>
oldId
)
{
// not operated
if (abs(rowHead->
version
) > oldId) { // not operated
pMetaRow = sdbGetRow(pTable, rowHead->data);
pMetaRow = sdbGetRow(pTable, rowHead->data);
if (pMetaRow == NULL) { // New object
if (pMetaRow == NULL) { // New object
if
(
rowHead
->
id
<
0
)
{
if (rowHead->
version
< 0) {
sdbError
(
"error sdb negative
id:%d, sdb:%s, skip"
,
rowHead
->
id
,
pTable
->
n
ame
);
sdbError("error sdb negative
version:%d, sdb:%s, skip", rowHead->version, pTable->tableN
ame);
} else {
} else {
rowMeta
.
id
=
rowHead
->
id
;
rowMeta.
version = rowHead->version
;
// TODO:Get rid of the rowMeta.offset and rowSize
// TODO:Get rid of the rowMeta.offset and rowSize
rowMeta
.
offset
=
pTable
->
s
ize
;
rowMeta.offset = pTable->
fileS
ize;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.rowSize = rowHead->rowSize;
rowMeta.row = (*pTable->decodeFp)(rowHead->data);
rowMeta.row = (*pTable->decodeFp)(rowHead->data);
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
(*sdbAddIndexFp[pTable->keyType])(pTable->iHandle, rowMeta.row, &rowMeta);
...
@@ -801,7 +833,7 @@ void sdbResetTable(SSdbTable *pTable) {
...
@@ -801,7 +833,7 @@ void sdbResetTable(SSdbTable *pTable) {
(*pTable->insertFp)(rowMeta.row);
(*pTable->insertFp)(rowMeta.row);
}
}
} else { // already exists
} else { // already exists
if
(
rowHead
->
id
<
0
)
{
// Delete the object
if (rowHead->
version
< 0) { // Delete the object
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
(*sdbDeleteIndexFp[pTable->keyType])(pTable->iHandle, rowHead->data);
(*pTable->destroyFp)(pMetaRow);
(*pTable->destroyFp)(pMetaRow);
pTable->numOfRows--;
pTable->numOfRows--;
...
@@ -811,19 +843,21 @@ void sdbResetTable(SSdbTable *pTable) {
...
@@ -811,19 +843,21 @@ void sdbResetTable(SSdbTable *pTable) {
}
}
}
}
pTable
->
s
ize
+=
real_size
;
pTable->
fileS
ize += real_size;
if
(
pTable
->
id
<
abs
(
rowHead
->
id
))
pTable
->
id
=
abs
(
rowHead
->
id
);
if (pTable->
version < abs(rowHead->version)) pTable->version = abs(rowHead->version
);
}
}
sdbVersion
+=
(
pTable
->
id
-
oldId
);
sdbVersion += (pTable->
version
- oldId);
tfree(rowHead);
tfree(rowHead);
sdbPrint
(
"table:%s is updated, sdbVerion:%"
PRId64
" id:%"
PRId64
,
pTable
->
name
,
sdbVersion
,
pTable
->
id
);
sdbPrint("table:%s is updated, sdbVerion:%" PRId64 " id:%" PRId64, pTable->tableName, sdbVersion, pTable->version);
*/
}
}
// TODO:A problem here :use snapshot file to sync another node will cause problem
// TODO:A problem here :use snapshot file to sync another node will cause problem
void
sdbSaveSnapShot
(
void
*
handle
)
{
void
sdbSaveSnapShot
(
void
*
handle
)
{
/*
SSdbTable *pTable = (SSdbTable *)handle;
SSdbTable *pTable = (SSdbTable *)handle;
SRowMeta * pMeta;
SRowMeta * pMeta;
void * pNode = NULL;
void * pNode = NULL;
...
@@ -834,15 +868,14 @@ void sdbSaveSnapShot(void *handle) {
...
@@ -834,15 +868,14 @@ void sdbSaveSnapShot(void *handle) {
uint32_t sdbEcommit = SDB_ENDCOMMIT;
uint32_t sdbEcommit = SDB_ENDCOMMIT;
char * dirc = NULL;
char * dirc = NULL;
char * basec = NULL;
char * basec = NULL;
/* char action = SDB_TYPE_INSERT; */
if (pTable == NULL) return;
if (pTable == NULL) return;
sdbTrace
(
"Table:%s, save the snapshop"
,
pTable
->
n
ame
);
sdbTrace("Table:%s, save the snapshop", pTable->
tableN
ame);
char fn[128] = "\0";
char fn[128] = "\0";
dirc
=
strdup
(
pTable
->
f
n
);
dirc = strdup(pTable->f
ileName
);
basec
=
strdup
(
pTable
->
f
n
);
basec = strdup(pTable->f
ileName
);
sprintf(fn, "%s/.%s", dirname(dirc), basename(basec));
sprintf(fn, "%s/.%s", dirname(dirc), basename(basec));
int32_t fd = open(fn, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t fd = open(fn, O_RDWR | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
tfree(dirc);
tfree(dirc);
...
@@ -851,7 +884,7 @@ void sdbSaveSnapShot(void *handle) {
...
@@ -851,7 +884,7 @@ void sdbSaveSnapShot(void *handle) {
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
SRowHead *rowHead = (SRowHead *)malloc(total_size);
if (rowHead == NULL) {
if (rowHead == NULL) {
sdbError
(
"failed to allocate memory while saving SDB snapshot, sdb:%s"
,
pTable
->
n
ame
);
sdbError("failed to allocate memory while saving SDB snapshot, sdb:%s", pTable->
tableN
ame);
return;
return;
}
}
memset(rowHead, 0, size);
memset(rowHead, 0, size);
...
@@ -867,17 +900,15 @@ void sdbSaveSnapShot(void *handle) {
...
@@ -867,17 +900,15 @@ void sdbSaveSnapShot(void *handle) {
if (pMeta == NULL) break;
if (pMeta == NULL) break;
rowHead->delimiter = SDB_DELIMITER;
rowHead->delimiter = SDB_DELIMITER;
rowHead
->
id
=
pMeta
->
id
;
rowHead->
version
= pMeta->id;
rowHead->rowSize = (*pTable->encodeFp)(pMeta->row, rowHead->data, pTable->maxRowSize);
rowHead->rowSize = (*pTable->encodeFp)(pMeta->row, rowHead->data, pTable->maxRowSize);
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
real_size = sizeof(SRowHead) + rowHead->rowSize + sizeof(TSCKSUM);
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
if (taosCalcChecksumAppend(0, (uint8_t *)rowHead, real_size) < 0) {
sdbError
(
"failed to get checksum while save sdb %s snapshot"
,
pTable
->
n
ame
);
sdbError("failed to get checksum while save sdb %s snapshot", pTable->
tableN
ame);
tfree(rowHead);
tfree(rowHead);
return;
return;
}
}
/* write(fd, &action, sizeof(action)); */
/* size += sizeof(action); */
twrite(fd, rowHead, real_size);
twrite(fd, rowHead, real_size);
size += real_size;
size += real_size;
twrite(fd, &sdbEcommit, sizeof(sdbEcommit));
twrite(fd, &sdbEcommit, sizeof(sdbEcommit));
...
@@ -889,14 +920,15 @@ void sdbSaveSnapShot(void *handle) {
...
@@ -889,14 +920,15 @@ void sdbSaveSnapShot(void *handle) {
// Remove the old file
// Remove the old file
tclose(pTable->fd);
tclose(pTable->fd);
remove
(
pTable
->
f
n
);
remove(pTable->f
ileName
);
// Rename the .sdb.db file to sdb.db file
// Rename the .sdb.db file to sdb.db file
rename
(
fn
,
pTable
->
f
n
);
rename(fn, pTable->f
ileName
);
pTable->fd = fd;
pTable->fd = fd;
pTable
->
s
ize
=
size
;
pTable->
fileS
ize = size;
pTable->numOfRows = numOfRows;
pTable->numOfRows = numOfRows;
fdatasync(pTable->fd);
fdatasync(pTable->fd);
*/
}
}
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
)
{
void
*
sdbFetchRow
(
void
*
handle
,
void
*
pNode
,
void
**
ppRow
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录