Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
63a68154
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
63a68154
编写于
1月 07, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more refact
上级
73254132
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
54 addition
and
944 deletion
+54
-944
include/dnode/vnode/tsdb2/tsdb.h
include/dnode/vnode/tsdb2/tsdb.h
+14
-12
source/dnode/vnode/tsdb2/inc/tsdbCompact.h
source/dnode/vnode/tsdb2/inc/tsdbCompact.h
+10
-10
source/dnode/vnode/tsdb2/inc/tsdbHealth.h
source/dnode/vnode/tsdb2/inc/tsdbHealth.h
+0
-28
source/dnode/vnode/tsdb2/src/tsdbCompact.c
source/dnode/vnode/tsdb2/src/tsdbCompact.c
+3
-0
source/dnode/vnode/tsdb2/src/tsdbFile.c
source/dnode/vnode/tsdb2/src/tsdbFile.c
+3
-3
source/dnode/vnode/tsdb2/src/tsdbHealth.c
source/dnode/vnode/tsdb2/src/tsdbHealth.c
+0
-102
source/dnode/vnode/tsdb2/src/tsdbMain.c
source/dnode/vnode/tsdb2/src/tsdbMain.c
+24
-24
source/dnode/vnode/tsdb2/src/tsdbScan.c
source/dnode/vnode/tsdb2/src/tsdbScan.c
+0
-38
source/dnode/vnode/tsdb2/src/tsdbSync.c
source/dnode/vnode/tsdb2/src/tsdbSync.c
+0
-727
未找到文件。
include/dnode/vnode/tsdb2/tsdb.h
浏览文件 @
63a68154
...
@@ -102,8 +102,8 @@ typedef struct STsdb STsdb;
...
@@ -102,8 +102,8 @@ typedef struct STsdb STsdb;
STsdbCfg
*
tsdbGetCfg
(
const
STsdb
*
repo
);
STsdbCfg
*
tsdbGetCfg
(
const
STsdb
*
repo
);
// --------- TSDB REPOSITORY DEFINITION
// --------- TSDB REPOSITORY DEFINITION
int32_t
tsdbCreateRepo
(
int
repoid
);
//
int32_t tsdbCreateRepo(int repoid);
int32_t
tsdbDropRepo
(
int
repoid
);
//
int32_t tsdbDropRepo(int repoid);
STsdb
*
tsdbOpen
(
STsdbCfg
*
pCfg
,
STsdbAppH
*
pAppH
);
STsdb
*
tsdbOpen
(
STsdbCfg
*
pCfg
,
STsdbAppH
*
pAppH
);
int
tsdbClose
(
STsdb
*
repo
,
int
toCommit
);
int
tsdbClose
(
STsdb
*
repo
,
int
toCommit
);
int32_t
tsdbConfigRepo
(
STsdb
*
repo
,
STsdbCfg
*
pCfg
);
int32_t
tsdbConfigRepo
(
STsdb
*
repo
,
STsdbCfg
*
pCfg
);
...
@@ -221,6 +221,7 @@ typedef struct SMemRef {
...
@@ -221,6 +221,7 @@ typedef struct SMemRef {
SMemSnapshot
snapshot
;
SMemSnapshot
snapshot
;
}
SMemRef
;
}
SMemRef
;
#if 0
typedef struct SFileBlockInfo {
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
int32_t numBlocksOfStep;
} SFileBlockInfo;
} SFileBlockInfo;
...
@@ -418,20 +419,21 @@ void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
...
@@ -418,20 +419,21 @@ void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
//
//
For TSDB Compact
int
tsdbCompact
(
STsdb
*
pRepo
);
//
int tsdbCompact(STsdb *pRepo);
// For TSDB Health Monitor
// For TSDB Health Monitor
// no problem return true
//
//
no problem return true
bool
tsdbNoProblem
(
STsdb
*
pRepo
);
//
bool tsdbNoProblem(STsdb *pRepo);
// unit of walSize: MB
//
//
unit of walSize: MB
int
tsdbCheckWal
(
STsdb
*
pRepo
,
uint32_t
walSize
);
//
int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
// for json tag
// // for json tag
void
*
getJsonTagValueElment
(
void
*
data
,
char
*
key
,
int32_t
keyLen
,
char
*
out
,
int16_t
bytes
);
// void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
void
getJsonTagValueAll
(
void
*
data
,
void
*
dst
,
int16_t
bytes
);
// void getJsonTagValueAll(void *data, void *dst, int16_t bytes);
char
*
parseTagDatatoJson
(
void
*
p
);
// char *parseTagDatatoJson(void *p);
#endif
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/vnode/tsdb2/inc/tsdbCompact.h
浏览文件 @
63a68154
...
@@ -12,17 +12,17 @@
...
@@ -12,17 +12,17 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef _TD_TSDB_COMPACT_H_
//
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
//
#define _TD_TSDB_COMPACT_H_
#ifdef __cplusplus
//
#ifdef __cplusplus
extern
"C"
{
//
extern "C" {
#endif
//
#endif
void
*
tsdbCompactImpl
(
STsdb
*
pRepo
);
//
void *tsdbCompactImpl(STsdb *pRepo);
#ifdef __cplusplus
//
#ifdef __cplusplus
}
//
}
#endif
//
#endif
#endif
/* _TD_TSDB_COMPACT_H_ */
// #endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
\ No newline at end of file
source/dnode/vnode/tsdb2/inc/tsdbHealth.h
已删除
100644 → 0
浏览文件 @
73254132
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// #ifndef _TD_TSDB_HEALTH_H_
// #define _TD_TSDB_HEALTH_H_
// #include "os.h"
// #include "tsdb.h"
// bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
// int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
// bool tsdbIdleMemEnough();
// bool tsdbAllowNewBlock(STsdbRepo* pRepo);
// #endif /* _TD_TSDB_BUFFER_H_ */
source/dnode/vnode/tsdb2/src/tsdbCompact.c
浏览文件 @
63a68154
...
@@ -12,6 +12,8 @@
...
@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#if 0
#include "tsdbint.h"
#include "tsdbint.h"
typedef struct {
typedef struct {
...
@@ -538,3 +540,4 @@ static int tsdbCompactMeta(STsdb *pRepo) {
...
@@ -538,3 +540,4 @@ static int tsdbCompactMeta(STsdb *pRepo) {
return 0;
return 0;
}
}
#endif
\ No newline at end of file
source/dnode/vnode/tsdb2/src/tsdbFile.c
浏览文件 @
63a68154
...
@@ -217,7 +217,7 @@ int tsdbScanAndTryFixMFile(STsdb *pRepo) {
...
@@ -217,7 +217,7 @@ int tsdbScanAndTryFixMFile(STsdb *pRepo) {
return
-
1
;
return
-
1
;
}
}
if
(
taosFtruncate
(
mf
.
fd
,
mf
.
info
.
size
)
<
0
)
{
if
(
taosFtruncate
File
(
mf
.
fd
,
mf
.
info
.
size
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbCloseMFile
(
&
mf
);
tsdbCloseMFile
(
&
mf
);
return
-
1
;
return
-
1
;
...
@@ -276,7 +276,7 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
...
@@ -276,7 +276,7 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
return
-
1
;
return
-
1
;
}
}
if
(
taosFtruncate
(
TSDB_FILE_FD
(
&
mf
),
pMFile
->
info
.
size
)
<
0
)
{
if
(
taosFtruncate
File
(
TSDB_FILE_FD
(
&
mf
),
pMFile
->
info
.
size
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbCloseMFile
(
&
mf
);
tsdbCloseMFile
(
&
mf
);
return
-
1
;
return
-
1
;
...
@@ -459,7 +459,7 @@ static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) {
...
@@ -459,7 +459,7 @@ static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) {
return
-
1
;
return
-
1
;
}
}
if
(
taosFtruncate
(
df
.
fd
,
df
.
info
.
size
)
<
0
)
{
if
(
taosFtruncate
File
(
df
.
fd
,
df
.
info
.
size
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbCloseDFile
(
&
df
);
tsdbCloseDFile
(
&
df
);
return
-
1
;
return
-
1
;
...
...
source/dnode/vnode/tsdb2/src/tsdbHealth.c
已删除
100644 → 0
浏览文件 @
73254132
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "tsdbHealth.h"
#include "os.h"
#include "query.h"
#include "tarray.h"
#include "tglobal.h"
#include "tlist.h"
#include "tmsg.h"
#include "tsdbBuffer.h"
#include "tsdbLog.h"
#include "tsdbint.h"
#include "tthread.h"
#include "ttimer.h"
// return malloc new block count
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo) {
STsdbBufPool* pPool = pRepo->pPool;
int32_t cnt = 0;
if (tsdbAllowNewBlock(pRepo)) {
STsdbBufBlock* pBufBlock = tsdbNewBufBlock(pPool->bufBlockSize);
if (pBufBlock) {
if (tdListAppend(pPool->bufBlockList, (void*)(&pBufBlock)) < 0) {
// append error
tsdbFreeBufBlock(pBufBlock);
} else {
pPool->nElasticBlocks++;
cnt++;
}
}
}
return cnt;
}
// switch anther thread to run
void* cbKillQueryFree(void* param) {
STsdbRepo* pRepo = (STsdbRepo*)param;
// vnode
if (pRepo->appH.notifyStatus) {
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
}
// free
if (pRepo->pthread) {
void* p = pRepo->pthread;
pRepo->pthread = NULL;
free(p);
}
return NULL;
}
// return true do free , false do nothing
bool tsdbUrgeQueryFree(STsdbRepo* pRepo) {
// check previous running
if (pRepo->pthread && taosThreadRunning(pRepo->pthread)) {
tsdbWarn("vgId:%d pre urge thread is runing. nBlocks=%d nElasticBlocks=%d", REPO_ID(pRepo),
pRepo->pPool->nBufBlocks, pRepo->pPool->nElasticBlocks);
return false;
}
// create new
pRepo->pthread = taosCreateThread(cbKillQueryFree, pRepo);
if (pRepo->pthread == NULL) {
tsdbError("vgId:%d create urge thread error.", REPO_ID(pRepo));
return false;
}
return true;
}
bool tsdbAllowNewBlock(STsdbRepo* pRepo) {
int32_t nMaxElastic = pRepo->config.totalBlocks / 3;
STsdbBufPool* pPool = pRepo->pPool;
if (pPool->nElasticBlocks >= nMaxElastic) {
tsdbWarn("vgId:%d tsdbAllowNewBlock return fasle. nElasticBlock(%d) >= MaxElasticBlocks(%d)", REPO_ID(pRepo),
pPool->nElasticBlocks, nMaxElastic);
return false;
}
return true;
}
bool tsdbNoProblem(STsdbRepo* pRepo) {
if (listNEles(pRepo->pPool->bufBlockList) == 0) return false;
return true;
}
#endif
\ No newline at end of file
source/dnode/vnode/tsdb2/src/tsdbMain.c
浏览文件 @
63a68154
...
@@ -32,36 +32,36 @@ static void tsdbStopStream(STsdb *pRepo);
...
@@ -32,36 +32,36 @@ static void tsdbStopStream(STsdb *pRepo);
static
int
tsdbRestoreLastColumns
(
STsdb
*
pRepo
,
STable
*
pTable
,
SReadH
*
pReadh
);
static
int
tsdbRestoreLastColumns
(
STsdb
*
pRepo
,
STable
*
pTable
,
SReadH
*
pReadh
);
static
int
tsdbRestoreLastRow
(
STsdb
*
pRepo
,
STable
*
pTable
,
SReadH
*
pReadh
,
SBlockIdx
*
pIdx
);
static
int
tsdbRestoreLastRow
(
STsdb
*
pRepo
,
STable
*
pTable
,
SReadH
*
pReadh
,
SBlockIdx
*
pIdx
);
// Function declaration
//
//
Function declaration
int32_t
tsdbCreateRepo
(
int
repoid
)
{
//
int32_t tsdbCreateRepo(int repoid) {
char
tsdbDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
//
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
char
dataDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
//
char dataDir[TSDB_FILENAME_LEN] = "\0";
tsdbGetRootDir
(
repoid
,
tsdbDir
);
//
tsdbGetRootDir(repoid, tsdbDir);
if
(
tfsMkdir
(
tsdbDir
)
<
0
)
{
//
if (tfsMkdir(tsdbDir) < 0) {
goto
_err
;
//
goto _err;
}
//
}
tsdbGetDataDir
(
repoid
,
dataDir
);
//
tsdbGetDataDir(repoid, dataDir);
if
(
tfsMkdir
(
dataDir
)
<
0
)
{
//
if (tfsMkdir(dataDir) < 0) {
goto
_err
;
//
goto _err;
}
//
}
// TODO: need to create current file with nothing in
//
// TODO: need to create current file with nothing in
return
0
;
//
return 0;
_err:
//
_err:
tsdbError
(
"vgId:%d failed to create TSDB repository since %s"
,
repoid
,
tstrerror
(
terrno
));
//
tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno));
return
-
1
;
//
return -1;
}
//
}
int32_t
tsdbDropRepo
(
int
repoid
)
{
//
int32_t tsdbDropRepo(int repoid) {
char
tsdbDir
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
//
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
tsdbGetRootDir
(
repoid
,
tsdbDir
);
//
tsdbGetRootDir(repoid, tsdbDir);
return
tfsRmdir
(
tsdbDir
);
//
return tfsRmdir(tsdbDir);
}
//
}
STsdb
*
tsdbOpen
(
STsdbCfg
*
pCfg
,
STsdbAppH
*
pAppH
)
{
STsdb
*
tsdbOpen
(
STsdbCfg
*
pCfg
,
STsdbAppH
*
pAppH
)
{
STsdb
*
pRepo
;
STsdb
*
pRepo
;
...
...
source/dnode/vnode/tsdb2/src/tsdbScan.c
已删除
100644 → 0
浏览文件 @
73254132
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
#if 0
#ifndef _TSDB_PLUGINS
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
STsdbScanHandle* tsdbNewScanHandle() { return NULL; }
void tsdbSetScanLogStream(STsdbScanHandle* pScanHandle, FILE* fLogStream) {}
int tsdbSetAndOpenScanFile(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
int tsdbScanSBlockIdx(STsdbScanHandle* pScanHandle) { return 0; }
int tsdbScanSBlock(STsdbScanHandle* pScanHandle, int idx) { return 0; }
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle) { return 0; }
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle) {}
#endif
#endif
\ No newline at end of file
source/dnode/vnode/tsdb2/src/tsdbSync.c
已删除
100644 → 0
浏览文件 @
73254132
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tsdbint.h"
// Sync handle
typedef
struct
{
STsdb
*
pRepo
;
SRtn
rtn
;
SOCKET
socketFd
;
void
*
pBuf
;
bool
mfChanged
;
SMFile
*
pmf
;
SMFile
mf
;
SDFileSet
df
;
SDFileSet
*
pdf
;
}
SSyncH
;
#define SYNC_BUFFER(sh) ((sh)->pBuf)
static
void
tsdbInitSyncH
(
SSyncH
*
pSyncH
,
STsdb
*
pRepo
,
SOCKET
socketFd
);
static
void
tsdbDestroySyncH
(
SSyncH
*
pSyncH
);
static
int32_t
tsdbSyncSendMeta
(
SSyncH
*
pSynch
);
static
int32_t
tsdbSyncRecvMeta
(
SSyncH
*
pSynch
);
static
int32_t
tsdbSendMetaInfo
(
SSyncH
*
pSynch
);
static
int32_t
tsdbRecvMetaInfo
(
SSyncH
*
pSynch
);
static
int32_t
tsdbSendDecision
(
SSyncH
*
pSynch
,
bool
toSend
);
static
int32_t
tsdbRecvDecision
(
SSyncH
*
pSynch
,
bool
*
toSend
);
static
int32_t
tsdbSyncSendDFileSetArray
(
SSyncH
*
pSynch
);
static
int32_t
tsdbSyncRecvDFileSetArray
(
SSyncH
*
pSynch
);
static
bool
tsdbIsTowFSetSame
(
SDFileSet
*
pSet1
,
SDFileSet
*
pSet2
);
static
int32_t
tsdbSyncSendDFileSet
(
SSyncH
*
pSynch
,
SDFileSet
*
pSet
);
static
int32_t
tsdbSendDFileSetInfo
(
SSyncH
*
pSynch
,
SDFileSet
*
pSet
);
static
int32_t
tsdbRecvDFileSetInfo
(
SSyncH
*
pSynch
);
static
int
tsdbReload
(
STsdb
*
pRepo
,
bool
isMfChanged
);
int32_t
tsdbSyncSend
(
void
*
tsdb
,
SOCKET
socketFd
)
{
STsdb
*
pRepo
=
(
STsdb
*
)
tsdb
;
SSyncH
synch
=
{
0
};
tsdbInitSyncH
(
&
synch
,
pRepo
,
socketFd
);
// Disable TSDB commit
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
if
(
tsdbSyncSendMeta
(
&
synch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send metafile since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
if
(
tsdbSyncSendDFileSetArray
(
&
synch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send filesets since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Enable TSDB commit
tsem_post
(
&
(
pRepo
->
readyToCommit
));
tsdbDestroySyncH
(
&
synch
);
return
0
;
_err:
tsem_post
(
&
(
pRepo
->
readyToCommit
));
tsdbDestroySyncH
(
&
synch
);
return
-
1
;
}
int32_t
tsdbSyncRecv
(
void
*
tsdb
,
SOCKET
socketFd
)
{
STsdb
*
pRepo
=
(
STsdb
*
)
tsdb
;
SSyncH
synch
=
{
0
};
pRepo
->
state
=
TSDB_STATE_OK
;
tsdbInitSyncH
(
&
synch
,
pRepo
,
socketFd
);
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
tsdbStartFSTxn
(
pRepo
,
0
,
0
);
if
(
tsdbSyncRecvMeta
(
&
synch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv metafile since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
if
(
tsdbSyncRecvDFileSetArray
(
&
synch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv filesets since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
tsdbEndFSTxn
(
pRepo
);
tsem_post
(
&
(
pRepo
->
readyToCommit
));
tsdbDestroySyncH
(
&
synch
);
// Reload file change
tsdbReload
(
pRepo
,
synch
.
mfChanged
);
return
0
;
_err:
tsdbEndFSTxnWithError
(
REPO_FS
(
pRepo
));
tsem_post
(
&
(
pRepo
->
readyToCommit
));
tsdbDestroySyncH
(
&
synch
);
return
-
1
;
}
static
void
tsdbInitSyncH
(
SSyncH
*
pSyncH
,
STsdb
*
pRepo
,
SOCKET
socketFd
)
{
pSyncH
->
pRepo
=
pRepo
;
pSyncH
->
socketFd
=
socketFd
;
tsdbGetRtnSnap
(
pRepo
,
&
(
pSyncH
->
rtn
));
}
static
void
tsdbDestroySyncH
(
SSyncH
*
pSyncH
)
{
taosTZfree
(
pSyncH
->
pBuf
);
}
static
int32_t
tsdbSyncSendMeta
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
bool
toSendMeta
=
false
;
SMFile
mf
;
// Send meta info to remote
tsdbInfo
(
"vgId:%d, metainfo will be sent"
,
REPO_ID
(
pRepo
));
if
(
tsdbSendMetaInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send metainfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
pRepo
->
fs
->
cstatus
->
pmf
==
NULL
)
{
// No meta file, not need to wait to retrieve meta file
tsdbInfo
(
"vgId:%d, metafile not exist, no need to send"
,
REPO_ID
(
pRepo
));
return
0
;
}
if
(
tsdbRecvDecision
(
pSynch
,
&
toSendMeta
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv decision while send meta since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
toSendMeta
)
{
tsdbInitMFileEx
(
&
mf
,
pRepo
->
fs
->
cstatus
->
pmf
);
if
(
tsdbOpenMFile
(
&
mf
,
O_RDONLY
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to open file while send metafile since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
int64_t
writeLen
=
mf
.
info
.
size
;
tsdbInfo
(
"vgId:%d, metafile:%s will be sent, size:%"
PRId64
,
REPO_ID
(
pRepo
),
mf
.
f
.
aname
,
writeLen
);
int64_t
ret
=
taosSendFile
(
pSynch
->
socketFd
,
TSDB_FILE_FD
(
&
mf
),
0
,
writeLen
);
if
(
ret
!=
writeLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to send metafile since %s, ret:%"
PRId64
" writeLen:%"
PRId64
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
),
ret
,
writeLen
);
tsdbCloseMFile
(
&
mf
);
return
-
1
;
}
tsdbCloseMFile
(
&
mf
);
tsdbInfo
(
"vgId:%d, metafile is sent"
,
REPO_ID
(
pRepo
));
}
else
{
tsdbInfo
(
"vgId:%d, metafile is same, no need to send"
,
REPO_ID
(
pRepo
));
}
return
0
;
}
static
int32_t
tsdbSyncRecvMeta
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
SMFile
*
pLMFile
=
pRepo
->
fs
->
cstatus
->
pmf
;
// Recv meta info from remote
if
(
tsdbRecvMetaInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv metainfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
// No meta file, do nothing (rm local meta file)
if
(
pSynch
->
pmf
==
NULL
)
{
if
(
pLMFile
==
NULL
)
{
pSynch
->
mfChanged
=
false
;
}
else
{
pSynch
->
mfChanged
=
true
;
}
tsdbInfo
(
"vgId:%d, metafile not exist in remote, no need to recv"
,
REPO_ID
(
pRepo
));
return
0
;
}
if
(
pLMFile
==
NULL
||
pSynch
->
pmf
->
info
.
size
!=
pLMFile
->
info
.
size
||
pSynch
->
pmf
->
info
.
magic
!=
pLMFile
->
info
.
magic
||
TSDB_FILE_IS_BAD
(
pLMFile
))
{
// Local has no meta file or has a different meta file, need to copy from remote
pSynch
->
mfChanged
=
true
;
if
(
tsdbSendDecision
(
pSynch
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision while recv metafile since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbInfo
(
"vgId:%d, metafile will be received"
,
REPO_ID
(
pRepo
));
// Recv from remote
SMFile
mf
;
SDiskID
did
=
{.
level
=
TFS_PRIMARY_LEVEL
,
.
id
=
TFS_PRIMARY_ID
};
tsdbInitMFile
(
&
mf
,
did
,
REPO_ID
(
pRepo
),
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)));
if
(
tsdbCreateMFile
(
&
mf
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to create file while recv metafile since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbInfo
(
"vgId:%d, metafile:%s is created"
,
REPO_ID
(
pRepo
),
mf
.
f
.
aname
);
int64_t
readLen
=
pSynch
->
pmf
->
info
.
size
;
int64_t
ret
=
taosCopyFds
(
pSynch
->
socketFd
,
TSDB_FILE_FD
(
&
mf
),
readLen
);
if
(
ret
!=
readLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to recv metafile since %s, ret:%"
PRId64
" readLen:%"
PRId64
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
),
ret
,
readLen
);
tsdbCloseMFile
(
&
mf
);
tsdbRemoveMFile
(
&
mf
);
return
-
1
;
}
tsdbInfo
(
"vgId:%d, metafile is received, size:%"
PRId64
,
REPO_ID
(
pRepo
),
readLen
);
mf
.
info
=
pSynch
->
pmf
->
info
;
tsdbCloseMFile
(
&
mf
);
tsdbUpdateMFile
(
REPO_FS
(
pRepo
),
&
mf
);
}
else
{
pSynch
->
mfChanged
=
false
;
tsdbInfo
(
"vgId:%d, metafile is same, no need to recv"
,
REPO_ID
(
pRepo
));
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision while recv metafile since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbUpdateMFile
(
REPO_FS
(
pRepo
),
pLMFile
);
}
return
0
;
}
static
int32_t
tsdbSendMetaInfo
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
uint32_t
tlen
=
0
;
SMFile
*
pMFile
=
pRepo
->
fs
->
cstatus
->
pmf
;
if
(
pMFile
)
{
tlen
=
tlen
+
tsdbEncodeSMFileEx
(
NULL
,
pMFile
)
+
sizeof
(
TSCKSUM
);
}
if
(
tsdbMakeRoom
((
void
**
)(
&
SYNC_BUFFER
(
pSynch
)),
tlen
+
sizeof
(
tlen
))
<
0
)
{
tsdbError
(
"vgId:%d, failed to makeroom while send metainfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
void
*
ptr
=
SYNC_BUFFER
(
pSynch
);
taosEncodeFixedU32
(
&
ptr
,
tlen
);
void
*
tptr
=
ptr
;
if
(
pMFile
)
{
tsdbEncodeSMFileEx
(
&
ptr
,
pMFile
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
tptr
,
tlen
);
}
int32_t
writeLen
=
tlen
+
sizeof
(
uint32_t
);
int32_t
ret
=
taosWriteMsg
(
pSynch
->
socketFd
,
SYNC_BUFFER
(
pSynch
),
writeLen
);
if
(
ret
!=
writeLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to send metainfo since %s, ret:%d writeLen:%d"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
),
ret
,
writeLen
);
return
-
1
;
}
tsdbInfo
(
"vgId:%d, metainfo is sent, tlen:%d, writeLen:%d"
,
REPO_ID
(
pRepo
),
tlen
,
writeLen
);
return
0
;
}
static
int32_t
tsdbRecvMetaInfo
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
uint32_t
tlen
=
0
;
char
buf
[
64
]
=
{
0
};
int32_t
readLen
=
sizeof
(
uint32_t
);
int32_t
ret
=
taosReadMsg
(
pSynch
->
socketFd
,
buf
,
readLen
);
if
(
ret
!=
readLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to recv metalen, ret:%d readLen:%d"
,
REPO_ID
(
pRepo
),
ret
,
readLen
);
return
-
1
;
}
taosDecodeFixedU32
(
buf
,
&
tlen
);
tsdbInfo
(
"vgId:%d, metalen is received, readLen:%d, tlen:%d"
,
REPO_ID
(
pRepo
),
readLen
,
tlen
);
if
(
tlen
==
0
)
{
pSynch
->
pmf
=
NULL
;
return
0
;
}
if
(
tsdbMakeRoom
((
void
**
)(
&
SYNC_BUFFER
(
pSynch
)),
tlen
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to makeroom while recv metainfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
ret
=
taosReadMsg
(
pSynch
->
socketFd
,
SYNC_BUFFER
(
pSynch
),
tlen
);
if
(
ret
!=
tlen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to recv metainfo, ret:%d tlen:%d"
,
REPO_ID
(
pRepo
),
ret
,
tlen
);
return
-
1
;
}
tsdbInfo
(
"vgId:%d, metainfo is received, tlen:%d"
,
REPO_ID
(
pRepo
),
tlen
);
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
SYNC_BUFFER
(
pSynch
),
tlen
))
{
terrno
=
TSDB_CODE_TDB_MESSED_MSG
;
tsdbError
(
"vgId:%d, failed to checksum while recv metainfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
pSynch
->
pmf
=
&
(
pSynch
->
mf
);
tsdbDecodeSMFileEx
(
SYNC_BUFFER
(
pSynch
),
pSynch
->
pmf
);
return
0
;
}
static
int32_t
tsdbSendDecision
(
SSyncH
*
pSynch
,
bool
toSend
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
uint8_t
decision
=
toSend
;
int32_t
writeLen
=
sizeof
(
uint8_t
);
int32_t
ret
=
taosWriteMsg
(
pSynch
->
socketFd
,
(
void
*
)(
&
decision
),
writeLen
);
if
(
ret
!=
writeLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to send decison, ret:%d writeLen:%d"
,
REPO_ID
(
pRepo
),
ret
,
writeLen
);
return
-
1
;
}
return
0
;
}
static
int32_t
tsdbRecvDecision
(
SSyncH
*
pSynch
,
bool
*
toSend
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
uint8_t
decision
=
0
;
int32_t
readLen
=
sizeof
(
uint8_t
);
int32_t
ret
=
taosReadMsg
(
pSynch
->
socketFd
,
(
void
*
)(
&
decision
),
readLen
);
if
(
ret
!=
readLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to recv decison, ret:%d readLen:%d"
,
REPO_ID
(
pRepo
),
ret
,
readLen
);
return
-
1
;
}
*
toSend
=
decision
;
return
0
;
}
static
int32_t
tsdbSyncSendDFileSetArray
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SFSIter
fsiter
;
SDFileSet
*
pSet
;
tsdbFSIterInit
(
&
fsiter
,
pfs
,
TSDB_FS_ITER_FORWARD
);
do
{
pSet
=
tsdbFSIterNext
(
&
fsiter
);
if
(
tsdbSyncSendDFileSet
(
pSynch
,
pSet
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send fileset:%d since %s"
,
REPO_ID
(
pRepo
),
pSet
?
pSet
->
fid
:
-
1
,
tstrerror
(
terrno
));
return
-
1
;
}
// No more file set to send, jut break
if
(
pSet
==
NULL
)
{
tsdbInfo
(
"vgId:%d, no filesets any more"
,
REPO_ID
(
pRepo
));
break
;
}
}
while
(
true
);
return
0
;
}
static
int32_t
tsdbSyncRecvDFileSetArray
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SFSIter
fsiter
;
SDFileSet
*
pLSet
;
// Local file set
tsdbFSIterInit
(
&
fsiter
,
pfs
,
TSDB_FS_ITER_FORWARD
);
pLSet
=
tsdbFSIterNext
(
&
fsiter
);
if
(
tsdbRecvDFileSetInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
while
(
true
)
{
if
(
pLSet
==
NULL
&&
pSynch
->
pdf
==
NULL
)
{
tsdbInfo
(
"vgId:%d, all filesets is disposed"
,
REPO_ID
(
pRepo
));
break
;
}
else
{
tsdbInfo
(
"vgId:%d, fileset local:%d remote:%d, will be disposed"
,
REPO_ID
(
pRepo
),
pLSet
!=
NULL
?
pLSet
->
fid
:
-
1
,
pSynch
->
pdf
!=
NULL
?
pSynch
->
pdf
->
fid
:
-
1
);
}
if
(
pLSet
&&
(
pSynch
->
pdf
==
NULL
||
pLSet
->
fid
<
pSynch
->
pdf
->
fid
))
{
// remote not has pLSet->fid set, just remove local (do nothing to remote the fset)
tsdbInfo
(
"vgId:%d, fileset:%d smaller than remote:%d, remove it"
,
REPO_ID
(
pRepo
),
pLSet
->
fid
,
pSynch
->
pdf
!=
NULL
?
pSynch
->
pdf
->
fid
:
-
1
);
pLSet
=
tsdbFSIterNext
(
&
fsiter
);
}
else
{
if
(
pLSet
&&
pSynch
->
pdf
&&
pLSet
->
fid
==
pSynch
->
pdf
->
fid
&&
tsdbIsTowFSetSame
(
pLSet
,
pSynch
->
pdf
)
&&
tsdbFSetIsOk
(
pLSet
))
{
// Just keep local files and notify remote not to send
tsdbInfo
(
"vgId:%d, fileset:%d is same and no need to recv"
,
REPO_ID
(
pRepo
),
pLSet
->
fid
);
if
(
tsdbUpdateDFileSet
(
pfs
,
pLSet
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to update fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
}
else
{
// Need to copy from remote
int
fidLevel
=
tsdbGetFidLevel
(
pSynch
->
pdf
->
fid
,
&
(
pSynch
->
rtn
));
if
(
fidLevel
<
0
)
{
// expired fileset
tsdbInfo
(
"vgId:%d, fileset:%d will be skipped as expired"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
if
(
tsdbSendDecision
(
pSynch
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
// Move forward
if
(
tsdbRecvDFileSetInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
pLSet
)
{
pLSet
=
tsdbFSIterNext
(
&
fsiter
);
}
// Next loop
continue
;
}
else
{
tsdbInfo
(
"vgId:%d, fileset:%d will be received"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
// Notify remote to send there file here
if
(
tsdbSendDecision
(
pSynch
,
true
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send decision since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
}
// Create local files and copy from remote
SDiskID
did
;
SDFileSet
fset
;
tfsAllocDisk
(
fidLevel
,
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
tsdbError
(
"vgId:%d, failed allc disk since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
tsdbInitDFileSet
(
&
fset
,
did
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
,
FS_TXN_VERSION
(
pfs
),
pSynch
->
pdf
->
ver
);
// Create new FSET
if
(
tsdbCreateDFileSet
(
&
fset
,
false
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to create fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSynch
->
pdf
);
ftype
++
)
{
SDFile
*
pDFile
=
TSDB_DFILE_IN_SET
(
&
fset
,
ftype
);
// local file
SDFile
*
pRDFile
=
TSDB_DFILE_IN_SET
(
pSynch
->
pdf
,
ftype
);
// remote file
tsdbInfo
(
"vgId:%d, file:%s will be received, osize:%"
PRIu64
" rsize:%"
PRIu64
,
REPO_ID
(
pRepo
),
pDFile
->
f
.
aname
,
pDFile
->
info
.
size
,
pRDFile
->
info
.
size
);
int64_t
writeLen
=
pRDFile
->
info
.
size
;
int64_t
ret
=
taosCopyFds
(
pSynch
->
socketFd
,
pDFile
->
fd
,
writeLen
);
if
(
ret
!=
writeLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to recv file:%s since %s, ret:%"
PRId64
" writeLen:%"
PRId64
,
REPO_ID
(
pRepo
),
pDFile
->
f
.
aname
,
tstrerror
(
terrno
),
ret
,
writeLen
);
tsdbCloseDFileSet
(
&
fset
);
tsdbRemoveDFileSet
(
&
fset
);
return
-
1
;
}
// Update new file info
pDFile
->
info
=
pRDFile
->
info
;
tsdbInfo
(
"vgId:%d, file:%s is received, size:%"
PRId64
,
REPO_ID
(
pRepo
),
pDFile
->
f
.
aname
,
writeLen
);
}
tsdbCloseDFileSet
(
&
fset
);
if
(
tsdbUpdateDFileSet
(
pfs
,
&
fset
)
<
0
)
{
tsdbInfo
(
"vgId:%d, fileset:%d failed to update since %s"
,
REPO_ID
(
pRepo
),
fset
.
fid
,
tstrerror
(
terrno
));
return
-
1
;
}
tsdbInfo
(
"vgId:%d, fileset:%d is received"
,
REPO_ID
(
pRepo
),
pSynch
->
pdf
->
fid
);
}
// Move forward
if
(
tsdbRecvDFileSetInfo
(
pSynch
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv fileset since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
if
(
pLSet
)
{
pLSet
=
tsdbFSIterNext
(
&
fsiter
);
}
}
#if 0
if (pLSet == NULL) {
// Copy from remote >>>>>>>>>>>
} else {
if (pSynch->pdf == NULL) {
// Remove local file, just ignore ++++++++++++++
pLSet = tsdbFSIterNext(&fsiter);
} else {
if (pLSet->fid < pSynch->pdf->fid) {
// Remove local file, just ignore ++++++++++++
pLSet = tsdbFSIterNext(&fsiter);
} else if (pLSet->fid > pSynch->pdf->fid){
// Copy from remote >>>>>>>>>>>>>>
if (tsdbRecvDFileSetInfo(pSynch) < 0) {
// TODO
return -1;
}
} else {
if (true/*TODO: is same fset*/) {
// No need to copy ---------------------
} else {
// copy from remote >>>>>>>>>>>>>.
}
}
}
}
#endif
}
return
0
;
}
static
bool
tsdbIsTowFSetSame
(
SDFileSet
*
pSet1
,
SDFileSet
*
pSet2
)
{
if
(
pSet1
->
ver
!=
pSet2
->
ver
)
{
return
false
;
}
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet1
);
ftype
++
)
{
SDFile
*
pDFile1
=
TSDB_DFILE_IN_SET
(
pSet1
,
ftype
);
SDFile
*
pDFile2
=
TSDB_DFILE_IN_SET
(
pSet2
,
ftype
);
if
(
pDFile1
->
info
.
size
!=
pDFile2
->
info
.
size
||
pDFile1
->
info
.
magic
!=
pDFile2
->
info
.
magic
)
{
return
false
;
}
}
return
true
;
}
static
int32_t
tsdbSyncSendDFileSet
(
SSyncH
*
pSynch
,
SDFileSet
*
pSet
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
bool
toSend
=
false
;
// skip expired fileset
if
(
pSet
&&
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
pSynch
->
rtn
))
<
0
)
{
tsdbInfo
(
"vgId:%d, don't sync send since fileset:%d smaller than minFid:%d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
pSynch
->
rtn
.
minFid
);
return
0
;
}
if
(
tsdbSendDFileSetInfo
(
pSynch
,
pSet
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to send fileset:%d info since %s"
,
REPO_ID
(
pRepo
),
pSet
?
pSet
->
fid
:
-
1
,
tstrerror
(
terrno
));
return
-
1
;
}
// No file any more, no need to send file, just return
if
(
pSet
==
NULL
)
{
return
0
;
}
if
(
tsdbRecvDecision
(
pSynch
,
&
toSend
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to recv decision while send fileset:%d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
return
-
1
;
}
if
(
toSend
)
{
tsdbInfo
(
"vgId:%d, fileset:%d will be sent"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
tsdbGetNFiles
(
pSet
);
ftype
++
)
{
SDFile
df
=
*
TSDB_DFILE_IN_SET
(
pSet
,
ftype
);
if
(
tsdbOpenDFile
(
&
df
,
O_RDONLY
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to file:%s since %s"
,
REPO_ID
(
pRepo
),
df
.
f
.
aname
,
tstrerror
(
terrno
));
return
-
1
;
}
int64_t
writeLen
=
df
.
info
.
size
;
tsdbInfo
(
"vgId:%d, file:%s will be sent, size:%"
PRId64
,
REPO_ID
(
pRepo
),
df
.
f
.
aname
,
writeLen
);
int64_t
ret
=
taosSendFile
(
pSynch
->
socketFd
,
TSDB_FILE_FD
(
&
df
),
0
,
writeLen
);
if
(
ret
!=
writeLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to send file:%s since %s, ret:%"
PRId64
" writeLen:%"
PRId64
,
REPO_ID
(
pRepo
),
df
.
f
.
aname
,
tstrerror
(
terrno
),
ret
,
writeLen
);
tsdbCloseDFile
(
&
df
);
return
-
1
;
}
tsdbInfo
(
"vgId:%d, file:%s is sent"
,
REPO_ID
(
pRepo
),
df
.
f
.
aname
);
tsdbCloseDFile
(
&
df
);
}
tsdbInfo
(
"vgId:%d, fileset:%d is sent"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
}
else
{
tsdbInfo
(
"vgId:%d, fileset:%d is same, no need to send"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
}
return
0
;
}
static
int32_t
tsdbSendDFileSetInfo
(
SSyncH
*
pSynch
,
SDFileSet
*
pSet
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
uint32_t
tlen
=
0
;
if
(
pSet
)
{
tlen
=
tsdbEncodeDFileSetEx
(
NULL
,
pSet
)
+
sizeof
(
TSCKSUM
);
}
if
(
tsdbMakeRoom
((
void
**
)(
&
SYNC_BUFFER
(
pSynch
)),
tlen
+
sizeof
(
tlen
))
<
0
)
{
tsdbError
(
"vgId:%d, failed to makeroom while send fileinfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
void
*
ptr
=
SYNC_BUFFER
(
pSynch
);
taosEncodeFixedU32
(
&
ptr
,
tlen
);
void
*
tptr
=
ptr
;
if
(
pSet
)
{
tsdbEncodeDFileSetEx
(
&
ptr
,
pSet
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
tptr
,
tlen
);
}
int32_t
writeLen
=
tlen
+
sizeof
(
uint32_t
);
int32_t
ret
=
taosWriteMsg
(
pSynch
->
socketFd
,
SYNC_BUFFER
(
pSynch
),
writeLen
);
if
(
ret
!=
writeLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to send fileinfo, ret:%d writeLen:%d"
,
REPO_ID
(
pRepo
),
ret
,
writeLen
);
return
-
1
;
}
return
0
;
}
static
int32_t
tsdbRecvDFileSetInfo
(
SSyncH
*
pSynch
)
{
STsdb
*
pRepo
=
pSynch
->
pRepo
;
uint32_t
tlen
;
char
buf
[
64
]
=
{
0
};
int32_t
readLen
=
sizeof
(
uint32_t
);
int32_t
ret
=
taosReadMsg
(
pSynch
->
socketFd
,
buf
,
readLen
);
if
(
ret
!=
readLen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
taosDecodeFixedU32
(
buf
,
&
tlen
);
tsdbInfo
(
"vgId:%d, fileinfo len:%d is received"
,
REPO_ID
(
pRepo
),
tlen
);
if
(
tlen
==
0
)
{
pSynch
->
pdf
=
NULL
;
return
0
;
}
if
(
tsdbMakeRoom
((
void
**
)(
&
SYNC_BUFFER
(
pSynch
)),
tlen
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to makeroom while recv fileinfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
ret
=
taosReadMsg
(
pSynch
->
socketFd
,
SYNC_BUFFER
(
pSynch
),
tlen
);
if
(
ret
!=
tlen
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tsdbError
(
"vgId:%d, failed to recv fileinfo, ret:%d readLen:%d"
,
REPO_ID
(
pRepo
),
ret
,
tlen
);
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
SYNC_BUFFER
(
pSynch
),
tlen
))
{
terrno
=
TSDB_CODE_TDB_MESSED_MSG
;
tsdbError
(
"vgId:%d, failed to checksum while recv fileinfo since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
pSynch
->
pdf
=
&
(
pSynch
->
df
);
tsdbDecodeDFileSetEx
(
SYNC_BUFFER
(
pSynch
),
pSynch
->
pdf
);
return
0
;
}
static
int
tsdbReload
(
STsdb
*
pRepo
,
bool
isMfChanged
)
{
// TODO: may need to stop and restart stream
// if (isMfChanged) {
tsdbCloseMeta
(
pRepo
);
tsdbFreeMeta
(
pRepo
->
tsdbMeta
);
pRepo
->
tsdbMeta
=
tsdbNewMeta
(
REPO_CFG
(
pRepo
));
tsdbOpenMeta
(
pRepo
);
tsdbLoadMetaCache
(
pRepo
,
true
);
// }
tsdbUnRefMemTable
(
pRepo
,
pRepo
->
mem
);
tsdbUnRefMemTable
(
pRepo
,
pRepo
->
imem
);
pRepo
->
mem
=
NULL
;
pRepo
->
imem
=
NULL
;
if
(
tsdbRestoreInfo
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to restore info from file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录