Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
21333a9c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
21333a9c
编写于
5月 02, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
9d0d5295
a5b39499
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
112 addition
and
57 deletion
+112
-57
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+1
-0
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+11
-9
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+71
-42
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+21
-3
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+7
-2
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
21333a9c
...
...
@@ -186,6 +186,7 @@ struct STsdbFS {
#define REPO_ID(r) TD_VID((r)->pVnode)
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
#define REPO_LEVEL(r) ((r)->level)
#define REPO_FS(r) ((r)->fs)
#define REPO_META(r) ((r)->pVnode->pMeta)
#define REPO_TFS(r) ((r)->pVnode->pTfs)
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
21333a9c
...
...
@@ -15,6 +15,8 @@
#include "tsdb.h"
extern
const
char
*
TSDB_LEVEL_DNAME
[];
typedef
enum
{
TSDB_TXN_TEMP_FILE
=
0
,
TSDB_TXN_CURR_FILE
}
TSDB_TXN_FILE_T
;
static
const
char
*
tsdbTxnFname
[]
=
{
"current.t"
,
"current"
};
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
...
...
@@ -35,12 +37,12 @@ static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
// static int tsdbProcessExpiredFS(STsdb *pRepo);
// static int tsdbCreateMeta(STsdb *pRepo);
static
void
tsdbGetRootDir
(
int
repoid
,
char
dirName
[])
{
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/
tsdb"
,
repoid
);
static
void
tsdbGetRootDir
(
int
repoid
,
int8_t
level
,
char
dirName
[])
{
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/
%s"
,
repoid
,
TSDB_LEVEL_DNAME
[
level
]
);
}
static
void
tsdbGetDataDir
(
int
repoid
,
char
dirName
[])
{
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/
tsdb/data"
,
repoid
);
static
void
tsdbGetDataDir
(
int
repoid
,
int8_t
level
,
char
dirName
[])
{
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/
%s/data"
,
repoid
,
TSDB_LEVEL_DNAME
[
level
]
);
}
// For backward compatibility
...
...
@@ -588,8 +590,8 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
}
static
void
tsdbGetTxnFname
(
STsdb
*
pRepo
,
TSDB_TXN_FILE_T
ftype
,
char
fname
[])
{
snprintf
(
fname
,
TSDB_FILENAME_LEN
,
"%s/vnode/vnode%d/
tsdb
/%s"
,
tfsGetPrimaryPath
(
REPO_TFS
(
pRepo
)),
REPO_ID
(
pRepo
),
tsdbTxnFname
[
ftype
]);
snprintf
(
fname
,
TSDB_FILENAME_LEN
,
"%s/vnode/vnode%d/
%s
/%s"
,
tfsGetPrimaryPath
(
REPO_TFS
(
pRepo
)),
REPO_ID
(
pRepo
),
TSDB_LEVEL_DNAME
[
REPO_LEVEL
(
pRepo
)],
tsdbTxnFname
[
ftype
]);
}
static
int
tsdbOpenFSFromCurrent
(
STsdb
*
pRepo
)
{
...
...
@@ -719,7 +721,7 @@ static int tsdbScanRootDir(STsdb *pRepo) {
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
const
STfsFile
*
pf
;
tsdbGetRootDir
(
REPO_ID
(
pRepo
),
rootDir
);
tsdbGetRootDir
(
REPO_ID
(
pRepo
),
REPO_LEVEL
(
pRepo
),
rootDir
);
STfsDir
*
tdir
=
tfsOpendir
(
REPO_TFS
(
pRepo
),
rootDir
);
if
(
tdir
==
NULL
)
{
tsdbError
(
"vgId:%d failed to open directory %s since %s"
,
REPO_ID
(
pRepo
),
rootDir
,
tstrerror
(
terrno
));
...
...
@@ -753,7 +755,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
const
STfsFile
*
pf
;
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
dataDir
);
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
REPO_LEVEL
(
pRepo
),
dataDir
);
STfsDir
*
tdir
=
tfsOpendir
(
REPO_TFS
(
pRepo
),
dataDir
);
if
(
tdir
==
NULL
)
{
tsdbError
(
"vgId:%d failed to open directory %s since %s"
,
REPO_ID
(
pRepo
),
dataDir
,
tstrerror
(
terrno
));
...
...
@@ -801,7 +803,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
regex_t
regex
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
dataDir
);
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
REPO_LEVEL
(
pRepo
),
dataDir
);
// Resource allocation and init
regcomp
(
&
regex
,
pattern
,
REG_EXTENDED
);
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
21333a9c
...
...
@@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"rsma"
,
// TSDB_FILE_RSMA
};
static
const
char
*
TSDB_LEVEL_DNAME
[]
=
{
const
char
*
TSDB_LEVEL_DNAME
[]
=
{
"tsdb"
,
"rsma1"
,
"rsma2"
,
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
21333a9c
...
...
@@ -98,46 +98,46 @@ typedef struct SIOCostSummary {
}
SIOCostSummary
;
typedef
struct
SBlockLoadSuppInfo
{
SColumnDataAgg
*
pstatis
;
SColumnDataAgg
**
plist
;
SArray
*
defaultLoadColumn
;
// default load column
int32_t
*
slotIds
;
// colId to slotId
SColumnDataAgg
*
pstatis
;
SColumnDataAgg
**
plist
;
SArray
*
defaultLoadColumn
;
// default load column
int32_t
*
slotIds
;
// colId to slotId
}
SBlockLoadSuppInfo
;
typedef
struct
STsdbReadHandle
{
STsdb
*
pTsdb
;
SQueryFilePos
cur
;
// current position
int16_t
order
;
STimeWindow
window
;
// the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t
numOfBlocks
;
SArray
*
pColumns
;
// column list, SColumnInfoData array list
bool
locateStart
;
int32_t
outputCapacity
;
int32_t
realNumOfRows
;
SArray
*
pTableCheckInfo
;
// SArray<STableCheckInfo>
int32_t
activeIndex
;
bool
checkFiles
;
// check file stage
int8_t
cachelastrow
;
// check if last row cached
bool
loadExternalRow
;
// load time window external data rows
bool
currentLoadExternalRows
;
// current load external rows
int32_t
loadType
;
// block load type
char
*
idStr
;
// query info handle, for debug purpose
STsdb
*
pTsdb
;
SQueryFilePos
cur
;
// current position
int16_t
order
;
STimeWindow
window
;
// the primary query time window that applies to all queries
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
int32_t
numOfBlocks
;
SArray
*
pColumns
;
// column list, SColumnInfoData array list
bool
locateStart
;
int32_t
outputCapacity
;
int32_t
realNumOfRows
;
SArray
*
pTableCheckInfo
;
// SArray<STableCheckInfo>
int32_t
activeIndex
;
bool
checkFiles
;
// check file stage
int8_t
cachelastrow
;
// check if last row cached
bool
loadExternalRow
;
// load time window external data rows
bool
currentLoadExternalRows
;
// current load external rows
int32_t
loadType
;
// block load type
char
*
idStr
;
// query info handle, for debug purpose
int32_t
type
;
// query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SDFileSet
*
pFileGroup
;
SFSIter
fileIter
;
SReadH
rhelper
;
STableBlockInfo
*
pDataBlockInfo
;
SDataCols
*
pDataCols
;
// in order to hold current file data block
int32_t
allocSize
;
// allocated data block size
SDataBlockLoadInfo
dataBlockLoadInfo
;
/* record current block load information */
SLoadCompBlockInfo
compBlockLoadInfo
;
/* record current compblock information in SQueryAttr */
SDataCols
*
pDataCols
;
// in order to hold current file data block
int32_t
allocSize
;
// allocated data block size
SDataBlockLoadInfo
dataBlockLoadInfo
;
/* record current block load information */
SLoadCompBlockInfo
compBlockLoadInfo
;
/* record current compblock information in SQueryAttr */
SBlockLoadSuppInfo
suppInfo
;
SArray
*
prev
;
// previous row which is before than time window
SArray
*
next
;
// next row which is after the query time window
SIOCostSummary
cost
;
STSchema
*
pSchema
;
SArray
*
prev
;
// previous row which is before than time window
SArray
*
next
;
// next row which is after the query time window
SIOCostSummary
cost
;
STSchema
*
pSchema
;
}
STsdbReadHandle
;
typedef
struct
STableGroupSupporter
{
...
...
@@ -164,6 +164,8 @@ static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
// static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
static
bool
tsdbGetExternalRow
(
tsdbReaderT
pHandle
);
static
STsdb
*
getTsdbByRetentions
(
SVnode
*
pVnode
,
TSKEY
winSKey
,
SRetention
*
retentions
);
static
void
tsdbInitDataBlockLoadInfo
(
SDataBlockLoadInfo
*
pBlockLoadInfo
)
{
pBlockLoadInfo
->
slot
=
-
1
;
pBlockLoadInfo
->
uid
=
0
;
...
...
@@ -350,12 +352,38 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
pTsdbReadHandle
->
window
.
ekey
,
pTsdbReadHandle
->
idStr
);
}
}
#if 0
int nQUERY = 0;
#endif
static
STsdb
*
getTsdbByRetentions
(
SVnode
*
pVnode
,
TSKEY
winSKey
,
SRetention
*
retentions
)
{
if
(
vnodeIsRollup
(
pVnode
))
{
// for(int32_t i=0; i< TSDB_; ) {
// }
int
level
=
0
;
#if 1
int64_t
now
=
taosGetTimestamp
(
pVnode
->
config
.
tsdbCfg
.
precision
);
for
(
int
i
=
0
;
i
<
TSDB_RETENTION_MAX
;
++
i
)
{
SRetention
*
pRetention
=
retentions
+
i
;
if
(
pRetention
->
keep
<=
0
||
(
now
-
pRetention
->
keep
)
>=
winSKey
)
{
break
;
}
}
#endif
#if 0
++nQUERY;
if(nQUERY%3 == 0) {
level = 2;
} else if(nQUERY%2 == 0) {
level = 1;
} else {
level = 0;
}
#endif
if
(
level
==
TSDB_RETENTION_L0
)
{
return
VND_RSMA0
(
pVnode
);
}
else
if
(
level
==
TSDB_RETENTION_L1
)
{
return
VND_RSMA1
(
pVnode
);
}
else
{
return
VND_RSMA2
(
pVnode
);
}
}
return
pVnode
->
pTsdb
;
}
...
...
@@ -420,8 +448,10 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
}
pReadHandle
->
suppInfo
.
defaultLoadColumn
=
getDefaultLoadColumns
(
pReadHandle
,
true
);
pReadHandle
->
suppInfo
.
slotIds
=
taosMemoryMalloc
(
sizeof
(
int32_t
)
*
taosArrayGetSize
(
pReadHandle
->
suppInfo
.
defaultLoadColumn
));
pReadHandle
->
suppInfo
.
plist
=
taosMemoryCalloc
(
taosArrayGetSize
(
pReadHandle
->
suppInfo
.
defaultLoadColumn
),
POINTER_BYTES
);
pReadHandle
->
suppInfo
.
slotIds
=
taosMemoryMalloc
(
sizeof
(
int32_t
)
*
taosArrayGetSize
(
pReadHandle
->
suppInfo
.
defaultLoadColumn
));
pReadHandle
->
suppInfo
.
plist
=
taosMemoryCalloc
(
taosArrayGetSize
(
pReadHandle
->
suppInfo
.
defaultLoadColumn
),
POINTER_BYTES
);
}
pReadHandle
->
pDataCols
=
tdNewDataCols
(
1000
,
pVnode
->
config
.
tsdbCfg
.
maxRows
);
...
...
@@ -444,7 +474,6 @@ _end:
tsdbReaderT
*
tsdbQueryTables
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
uint64_t
taskId
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
tsdbQueryTablesImpl
(
pVnode
,
pCond
,
qId
,
taskId
);
if
(
pTsdbReadHandle
==
NULL
)
{
return
NULL
;
...
...
@@ -462,16 +491,16 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG
return
NULL
;
}
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
0
);
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
0
);
pTsdbReadHandle
->
pSchema
=
metaGetTbTSchema
(
pVnode
->
pMeta
,
pCheckInfo
->
tableId
,
0
);
int32_t
numOfCols
=
taosArrayGetSize
(
pTsdbReadHandle
->
suppInfo
.
defaultLoadColumn
);
int32_t
numOfCols
=
taosArrayGetSize
(
pTsdbReadHandle
->
suppInfo
.
defaultLoadColumn
);
int16_t
*
ids
=
pTsdbReadHandle
->
suppInfo
.
defaultLoadColumn
->
pData
;
STSchema
*
pSchema
=
pTsdbReadHandle
->
pSchema
;
int32_t
i
=
0
,
j
=
0
;
while
(
i
<
numOfCols
&&
j
<
pSchema
->
numOfCols
)
{
while
(
i
<
numOfCols
&&
j
<
pSchema
->
numOfCols
)
{
if
(
ids
[
i
]
==
pSchema
->
columns
[
j
].
colId
)
{
pTsdbReadHandle
->
suppInfo
.
slotIds
[
i
]
=
j
;
i
++
;
...
...
@@ -1137,7 +1166,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
int32_t
slotIndex
)
{
int64_t
st
=
taosGetTimestampUs
();
int32_t
code
=
tdInitDataCols
(
pTsdbReadHandle
->
pDataCols
,
pTsdbReadHandle
->
pSchema
);
int32_t
code
=
tdInitDataCols
(
pTsdbReadHandle
->
pDataCols
,
pTsdbReadHandle
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbError
(
"%p failed to malloc buf for pDataCols, %s"
,
pTsdbReadHandle
,
pTsdbReadHandle
->
idStr
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
21333a9c
...
...
@@ -229,10 +229,28 @@ int vnodeCommit(SVnode *pVnode) {
ASSERT
(
0
);
return
-
1
;
}
if
(
tsdbCommit
(
pVnode
->
pTsdb
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
if
(
vnodeIsRollup
(
pVnode
))
{
if
(
tsdbCommit
(
VND_RSMA0
(
pVnode
))
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tsdbCommit
(
VND_RSMA1
(
pVnode
))
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
if
(
tsdbCommit
(
VND_RSMA2
(
pVnode
))
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
else
{
if
(
tsdbCommit
(
pVnode
->
pTsdb
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
if
(
tqCommit
(
pVnode
->
pTq
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
21333a9c
...
...
@@ -949,9 +949,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
if
(
pCtx
->
pSem
!=
NULL
)
{
tTrace
(
"%s cli conn %p handle resp"
,
pTransInst
->
label
,
pConn
);
memcpy
((
char
*
)
pCtx
->
pRsp
,
(
char
*
)
pResp
,
sizeof
(
*
pResp
));
tTrace
(
"%s cli conn %p(sync) handle resp"
,
pTransInst
->
label
,
pConn
);
if
(
pCtx
->
pRsp
==
NULL
)
{
tTrace
(
"%s cli conn %p(sync) failed to resp, ignore"
,
pTransInst
->
label
,
pConn
);
}
else
{
memcpy
((
char
*
)
pCtx
->
pRsp
,
(
char
*
)
pResp
,
sizeof
(
*
pResp
));
}
tsem_post
(
pCtx
->
pSem
);
pCtx
->
pRsp
=
NULL
;
}
else
{
tTrace
(
"%s cli conn %p handle resp"
,
pTransInst
->
label
,
pConn
);
pTransInst
->
cfp
(
pTransInst
->
parent
,
pResp
,
pEpSet
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录