Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a04b6dba
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a04b6dba
编写于
1月 10, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more
上级
041c977c
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
30 addition
and
462 deletion
+30
-462
include/dnode/vnode/tsdb2/tsdb.h
include/dnode/vnode/tsdb2/tsdb.h
+0
-433
include/dnode/vnode/vnode.h
include/dnode/vnode/vnode.h
+1
-1
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+23
-23
source/dnode/vnode/impl/src/vnodeMain.c
source/dnode/vnode/impl/src/vnodeMain.c
+5
-4
source/dnode/vnode/tsdb/src/tsdbCommit.c
source/dnode/vnode/tsdb/src/tsdbCommit.c
+1
-1
未找到文件。
include/dnode/vnode/tsdb2/tsdb.h
已删除
100644 → 0
浏览文件 @
041c977c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include "common.h"
#include "taosdef.h"
#include "tarray.h"
#include "tdataformat.h"
#include "thash.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tname.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 // commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2
typedef
struct
SDataStatis
{
int16_t
colId
;
int64_t
sum
;
int64_t
max
;
int64_t
min
;
int16_t
maxIndex
;
int16_t
minIndex
;
int16_t
numOfNull
;
}
SDataStatis
;
// --------- TSDB APPLICATION HANDLE DEFINITION
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef
struct
{
int32_t
tsdbId
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
daysPerFile
;
// day per file sharding policy
int32_t
keep
;
// day of data to keep
int32_t
keep1
;
int32_t
keep2
;
int32_t
lruCacheSize
;
int32_t
minRowsPerFileBlock
;
// minimum rows per file block
int32_t
maxRowsPerFileBlock
;
// maximum rows per file block
int8_t
precision
;
int8_t
compression
;
int8_t
update
;
int8_t
cacheLastRow
;
// 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
}
STsdbCfg
;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
typedef
struct
{
int64_t
totalStorage
;
// total bytes occupie
int64_t
compStorage
;
int64_t
pointsWritten
;
// total data points written
}
STsdbStat
;
typedef
struct
STsdb
STsdb
;
STsdbCfg
*
tsdbGetCfg
(
const
STsdb
*
repo
);
// --------- TSDB REPOSITORY DEFINITION
// int32_t tsdbCreateRepo(int repoid);
// int32_t tsdbDropRepo(int repoid);
STsdb
*
tsdbOpen
(
STsdbCfg
*
pCfg
,
STsdbAppH
*
pAppH
);
int
tsdbClose
(
STsdb
*
repo
,
int
toCommit
);
int32_t
tsdbConfigRepo
(
STsdb
*
repo
,
STsdbCfg
*
pCfg
);
int
tsdbGetState
(
STsdb
*
repo
);
int8_t
tsdbGetCompactState
(
STsdb
*
repo
);
// --------- TSDB TABLE DEFINITION
typedef
struct
{
uint64_t
uid
;
// the unique table ID
int32_t
tid
;
// the table ID in the repository.
}
STableId
;
// --------- TSDB TABLE configuration
typedef
struct
{
ETableType
type
;
char
*
name
;
STableId
tableId
;
int32_t
sversion
;
char
*
sname
;
// super table name
uint64_t
superUid
;
STSchema
*
schema
;
STSchema
*
tagSchema
;
SKVRow
tagValues
;
char
*
sql
;
}
STableCfg
;
void
tsdbClearTableCfg
(
STableCfg
*
config
);
void
*
tsdbGetTableTagVal
(
const
void
*
pTable
,
int32_t
colId
,
int16_t
type
);
char
*
tsdbGetTableName
(
void
*
pTable
);
#define TSDB_TABLEID(_table) ((STableId *)(_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg
*
tsdbCreateTableCfgFromMsg
(
SMDCreateTableMsg
*
pMsg
);
int
tsdbCreateTable
(
STsdb
*
repo
,
STableCfg
*
pCfg
);
int
tsdbDropTable
(
STsdb
*
pRepo
,
STableId
tableId
);
int
tsdbUpdateTableTagValue
(
STsdb
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
uint32_t
tsdbGetFileInfo
(
STsdb
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
);
// the TSDB repository info
typedef
struct
STsdbRepoInfo
{
STsdbCfg
tsdbCfg
;
uint64_t
version
;
// version of the repository
int64_t
tsdbTotalDataSize
;
// the original inserted data size
int64_t
tsdbTotalDiskSize
;
// the total disk size taken by this TSDB repository
// TODO: Other informations to add
}
STsdbRepoInfo
;
STsdbRepoInfo
*
tsdbGetStatus
(
STsdb
*
pRepo
);
// the meter information report structure
typedef
struct
{
STableCfg
tableCfg
;
uint64_t
version
;
int64_t
tableTotalDataSize
;
// In bytes
int64_t
tableTotalDiskSize
;
// In bytes
}
STableInfo
;
// -- FOR INSERT DATA
/**
* Insert data to a table in a repository
* @param pRepo the TSDB repository handle
* @param pData the data to insert (will give a more specific description)
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t
tsdbInsertData
(
STsdb
*
repo
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
);
// -- FOR QUERY TIME SERIES DATA
typedef
void
*
TsdbQueryHandleT
;
// Use void to hide implementation details
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int64_t
offset
;
// skip offset put down to tsdb
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
}
STsdbQueryCond
;
typedef
struct
STableData
STableData
;
typedef
struct
{
T_REF_DECLARE
()
SRWLatch
latch
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
int64_t
numOfRows
;
int32_t
maxTables
;
STableData
**
tData
;
SList
*
actList
;
SList
*
extraBuffList
;
SList
*
bufBlockList
;
int64_t
pointsAdd
;
// TODO
int64_t
storageAdd
;
// TODO
}
SMemTable
;
typedef
struct
{
SMemTable
*
mem
;
SMemTable
*
imem
;
SMemTable
mtable
;
SMemTable
*
omem
;
}
SMemSnapshot
;
typedef
struct
SMemRef
{
int32_t
ref
;
SMemSnapshot
snapshot
;
}
SMemRef
;
#if 0
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
typedef struct {
void *pTable;
TSKEY lastKey;
} STableKeyInfo;
typedef struct {
uint32_t numOfTables;
SArray * pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray * dataBlockInfos;
} STableBlockDist;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
SMemRef *pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
SMemRef *pRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
SMemRef *pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT *pQueryHandle);
/**
* get the queried table object list
* @param pHandle
* @return
*/
SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
/**
* get the group list according to table id from client
* @param tsdb
* @param pCond
* @param groupList
* @param qinfo
* @return
*/
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
uint64_t qId, SMemRef *pRef);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT *pHandle);
/**
* move to next block if exists
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);
/**
* Get current data block information
*
* @param pQueryHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pQueryHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);
/**
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(STsdb *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo *groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT *queryHandle, STableBlockDist *pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
* @param totalPoints. total data point written
* @param totalStorage. total bytes took by the tsdb
* @param compStorage. total bytes took by the tsdb after compressed
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(STsdb *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
// For TSDB file sync
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// // For TSDB Compact
// int tsdbCompact(STsdb *pRepo);
// For TSDB Health Monitor
// // no problem return true
// bool tsdbNoProblem(STsdb *pRepo);
// // unit of walSize: MB
// int tsdbCheckWal(STsdb *pRepo, uint32_t walSize);
// // for json tag
// void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes);
// void getJsonTagValueAll(void *data, void *dst, int16_t bytes);
// char *parseTagDatatoJson(void *p);
#endif
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_H_
include/dnode/vnode/vnode.h
浏览文件 @
a04b6dba
...
...
@@ -87,7 +87,7 @@ void vnodeClear();
* @param pVnodeCfg options of the vnode
* @return SVnode* The vnode object
*/
SVnode
*
vnodeOpen
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
,
int32_t
vid
);
/**
* @brief Close a VNODE
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
a04b6dba
...
...
@@ -33,14 +33,14 @@ typedef struct {
int8_t
dropped
;
int8_t
accessState
;
uint64_t
dbUid
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
char
*
db
;
char
*
path
;
SVnode
*
pImpl
;
STaosQueue
*
pWriteQ
;
STaosQueue
*
pSyncQ
;
STaosQueue
*
pApplyQ
;
STaosQueue
*
pQueryQ
;
STaosQueue
*
pFetchQ
;
STaosQueue
*
pFetchQ
;
}
SVnodeObj
;
typedef
struct
{
...
...
@@ -49,7 +49,7 @@ typedef struct {
int32_t
failed
;
int32_t
threadIndex
;
pthread_t
thread
;
SDnode
*
pDnode
;
SDnode
*
pDnode
;
SWrapperCfg
*
pCfgs
;
}
SVnodeThread
;
...
...
@@ -81,7 +81,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE
void
dndProcessVnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
dndPutMsgIntoVnodeApplyQueue
(
SDnode
*
pDnode
,
int32_t
vgId
,
SRpcMsg
*
pMsg
);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
);
static
void
dndReleaseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
);
static
int32_t
dndOpenVnode
(
SDnode
*
pDnode
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
);
static
void
dndCloseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
);
...
...
@@ -94,7 +94,7 @@ static void dndCloseVnodes(SDnode *pDnode);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodeObj
*
pVnode
=
NULL
;
SVnodeObj
*
pVnode
=
NULL
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -127,7 +127,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static
int32_t
dndOpenVnode
(
SDnode
*
pDnode
,
SWrapperCfg
*
pCfg
,
SVnode
*
pImpl
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodeObj
*
pVnode
=
calloc
(
1
,
sizeof
(
SVnodeObj
));
SVnodeObj
*
pVnode
=
calloc
(
1
,
sizeof
(
SVnodeObj
));
if
(
pVnode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -197,7 +197,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
dndFreeVnodeWriteQueue
(
pDnode
,
pVnode
);
dndFreeVnodeApplyQueue
(
pDnode
,
pVnode
);
dndFreeVnodeSyncQueue
(
pDnode
,
pVnode
);
vnodeClose
(
pVnode
->
pImpl
);
pVnode
->
pImpl
=
NULL
;
...
...
@@ -217,7 +217,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
if
(
pVnode
&&
num
<
size
)
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
...
...
@@ -239,9 +239,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
int32_t
code
=
TSDB_CODE_DND_VNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
30000
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
char
file
[
PATH_MAX
+
20
]
=
{
0
};
SWrapperCfg
*
pCfgs
=
NULL
;
...
...
@@ -286,7 +286,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
}
for
(
int32_t
i
=
0
;
i
<
vnodesNum
;
++
i
)
{
cJSON
*
vnode
=
cJSON_GetArrayItem
(
vnodes
,
i
);
cJSON
*
vnode
=
cJSON_GetArrayItem
(
vnodes
,
i
);
SWrapperCfg
*
pCfg
=
&
pCfgs
[
i
];
cJSON
*
vgId
=
cJSON_GetObjectItem
(
vnode
,
"vgId"
);
...
...
@@ -356,7 +356,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
int32_t
len
=
0
;
int32_t
maxLen
=
65536
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
vnodes
\"
: [
\n
"
);
...
...
@@ -398,8 +398,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static
void
*
dnodeOpenVnodeFunc
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SDnode
*
pDnode
=
pThread
->
pDnode
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SDnode
*
pDnode
=
pThread
->
pDnode
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
dDebug
(
"thread:%d, start to open %d vnodes"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
);
setThreadName
(
"open-vnodes"
);
...
...
@@ -412,7 +412,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt
->
openVnodes
,
pMgmt
->
totalVnodes
);
dndReportStartup
(
pDnode
,
"open-vnodes"
,
stepDesc
);
SVnode
*
pImpl
=
vnodeOpen
(
pCfg
->
path
,
NULL
);
SVnode
*
pImpl
=
vnodeOpen
(
pCfg
->
path
,
NULL
,
pCfg
->
vgId
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
failed
++
;
...
...
@@ -610,7 +610,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) {
return
0
;
}
SVnode
*
pImpl
=
vnodeOpen
(
wrapperCfg
.
path
,
NULL
/*pCfg*/
);
SVnode
*
pImpl
=
vnodeOpen
(
wrapperCfg
.
path
,
NULL
/*pCfg*/
,
pCreate
->
vgId
);
if
(
pImpl
==
NULL
)
{
return
-
1
;
}
...
...
@@ -1016,7 +1016,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) {
}
static
int32_t
dndInitVnodeWriteWorker
(
SDnode
*
pDnode
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SMWorkerPool
*
pPool
=
&
pMgmt
->
writePool
;
pPool
->
name
=
"vnode-write"
;
pPool
->
max
=
pDnode
->
opt
.
numOfCores
;
...
...
@@ -1056,7 +1056,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
int32_t
maxThreads
=
pDnode
->
opt
.
numOfCores
/
2
;
if
(
maxThreads
<
1
)
maxThreads
=
1
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SMWorkerPool
*
pPool
=
&
pMgmt
->
syncPool
;
pPool
->
name
=
"vnode-sync"
;
pPool
->
max
=
maxThreads
;
...
...
@@ -1118,12 +1118,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
pLoads
->
num
=
taosHashGetSize
(
pMgmt
->
hash
);
int32_t
v
=
0
;
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
void
*
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
NULL
);
while
(
pIter
)
{
SVnodeObj
**
ppVnode
=
pIter
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
continue
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
*
pLoad
=
&
pLoads
->
data
[
v
++
];
vnodeGetLoad
(
pVnode
->
pImpl
,
pLoad
);
...
...
source/dnode/vnode/impl/src/vnodeMain.c
浏览文件 @
a04b6dba
...
...
@@ -15,12 +15,12 @@
#include "vnodeDef.h"
static
SVnode
*
vnodeNew
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
);
static
SVnode
*
vnodeNew
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
,
int32_t
vid
);
static
void
vnodeFree
(
SVnode
*
pVnode
);
static
int
vnodeOpenImpl
(
SVnode
*
pVnode
);
static
void
vnodeCloseImpl
(
SVnode
*
pVnode
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
)
{
SVnode
*
vnodeOpen
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
,
int32_t
vid
)
{
SVnode
*
pVnode
=
NULL
;
// Set default options
...
...
@@ -35,7 +35,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
}
// Create the handle
pVnode
=
vnodeNew
(
path
,
pVnodeCfg
);
pVnode
=
vnodeNew
(
path
,
pVnodeCfg
,
vid
);
if
(
pVnode
==
NULL
)
{
// TODO: handle error
return
NULL
;
...
...
@@ -62,7 +62,7 @@ void vnodeClose(SVnode *pVnode) {
void
vnodeDestroy
(
const
char
*
path
)
{
taosRemoveDir
(
path
);
}
/* ------------------------ STATIC METHODS ------------------------ */
static
SVnode
*
vnodeNew
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
)
{
static
SVnode
*
vnodeNew
(
const
char
*
path
,
const
SVnodeCfg
*
pVnodeCfg
,
int32_t
vid
)
{
SVnode
*
pVnode
=
NULL
;
pVnode
=
(
SVnode
*
)
calloc
(
1
,
sizeof
(
*
pVnode
));
...
...
@@ -71,6 +71,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
return
NULL
;
}
pVnode
->
vgId
=
vid
;
pVnode
->
path
=
strdup
(
path
);
vnodeOptionsCopy
(
&
(
pVnode
->
config
),
pVnodeCfg
);
...
...
source/dnode/vnode/tsdb/src/tsdbCommit.c
浏览文件 @
a04b6dba
...
...
@@ -351,7 +351,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
// Loop to commit each table data
for
(
int
tid
=
1
;
tid
<
pCommith
->
niters
;
tid
++
)
{
for
(
int
tid
=
0
;
tid
<
pCommith
->
niters
;
tid
++
)
{
SCommitIter
*
pIter
=
pCommith
->
iters
+
tid
;
if
(
pIter
->
pTable
==
NULL
)
continue
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录