Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ed6276bd
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ed6276bd
编写于
6月 27, 2022
作者:
M
Minglei Jin
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feat/tsdb_refact' of
https://github.com/taosdata/TDengine
into feat/tsdb_refact
上级
03d5b62d
d814016e
变更
14
展开全部
隐藏空白更改
内联
并排
Showing
14 changed file
with
1592 addition
and
980 deletion
+1592
-980
contrib/test/CMakeLists.txt
contrib/test/CMakeLists.txt
+1
-0
contrib/test/lz4/CMakeLists.txt
contrib/test/lz4/CMakeLists.txt
+6
-0
contrib/test/lz4/main.c
contrib/test/lz4/main.c
+8
-0
include/common/tcommon.h
include/common/tcommon.h
+5
-5
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+4
-5
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+16
-7
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+8
-2
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+65
-1
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+101
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+974
-760
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+242
-168
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+156
-19
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+5
-1
未找到文件。
contrib/test/CMakeLists.txt
浏览文件 @
ed6276bd
...
@@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT})
...
@@ -24,3 +24,4 @@ if(${BUILD_WITH_TRAFT})
endif
(
${
BUILD_WITH_TRAFT
}
)
endif
(
${
BUILD_WITH_TRAFT
}
)
add_subdirectory
(
tdev
)
add_subdirectory
(
tdev
)
add_subdirectory
(
lz4
)
contrib/test/lz4/CMakeLists.txt
0 → 100644
浏览文件 @
ed6276bd
add_executable
(
lz4_test
""
)
target_sources
(
lz4_test
PRIVATE
"main.c"
)
target_link_libraries
(
lz4_test lz4_static
)
\ No newline at end of file
contrib/test/lz4/main.c
0 → 100644
浏览文件 @
ed6276bd
#include <stdio.h>
#include "lz4.h"
int
main
(
int
argc
,
char
const
*
argv
[])
{
printf
(
"%d
\n
"
,
LZ4_compressBound
(
1024
));
return
0
;
}
include/common/tcommon.h
浏览文件 @
ed6276bd
...
@@ -101,17 +101,17 @@ typedef struct SColumnInfoData {
...
@@ -101,17 +101,17 @@ typedef struct SColumnInfoData {
}
SColumnInfoData
;
}
SColumnInfoData
;
typedef
struct
SQueryTableDataCond
{
typedef
struct
SQueryTableDataCond
{
// STimeWindow twindow;
uint64_t
suid
;
uint64_t
suid
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
int32_t
numOfCols
;
SColumnInfo
*
colList
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
int32_t
type
;
// data block load type:
int32_t
numOfTWindows
;
int32_t
numOfTWindows
;
STimeWindow
*
twindows
;
STimeWindow
*
twindows
;
int64_t
startVersion
;
int32_t
numOfTables
;
// number of tables
int64_t
endVersion
;
uint64_t
*
uidList
;
// table uid list
int64_t
startVersion
;
// start version
int64_t
endVersion
;
// end version
}
SQueryTableDataCond
;
}
SQueryTableDataCond
;
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
ed6276bd
...
@@ -116,12 +116,11 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
...
@@ -116,12 +116,11 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb;
// typedef struct STsdb STsdb;
typedef
struct
STsdbReader
STsdbReader
;
typedef
struct
STsdbReader
STsdbReader
;
#define BLOCK_LOAD_OFFSET_
SEQ_ORDER
1
#define BLOCK_LOAD_OFFSET_
ORDER
1
#define BLOCK_LOAD_TABLE
_
SEQ_ORDER 2
#define BLOCK_LOAD_TABLESEQ_ORDER 2
#define BLOCK_LOAD_
TABLE_RR_ORDER
3
#define BLOCK_LOAD_
EXTERN_ORDER
3
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableInfoGroup
,
uint64_t
qId
,
int32_t
tsdbReaderOpen
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
pTableList
,
uint64_t
qId
,
uint64_t
taskId
,
STsdbReader
**
ppReader
);
uint64_t
taskId
,
STsdbReader
**
ppReader
);
void
tsdbReaderClose
(
STsdbReader
*
pReader
);
void
tsdbReaderClose
(
STsdbReader
*
pReader
);
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
);
bool
tsdbNextDataBlock
(
STsdbReader
*
pReader
);
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
);
void
tsdbRetrieveDataBlockInfo
(
STsdbReader
*
pReader
,
SDataBlockInfo
*
pDataBlockInfo
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
ed6276bd
...
@@ -86,8 +86,8 @@ typedef struct STsdbFSState STsdbFSState;
...
@@ -86,8 +86,8 @@ typedef struct STsdbFSState STsdbFSState;
#define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow])
#define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow])
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
;
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
;
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void
tsdbRowGetColVal
(
TSDBROW
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
);
void
tsdbRowGetColVal
(
TSDBROW
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
);
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
...
@@ -132,6 +132,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
...
@@ -132,6 +132,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs);
void
tColDataReset
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
);
void
tColDataReset
(
SColData
*
pColData
,
int16_t
cid
,
int8_t
type
);
void
tColDataClear
(
void
*
ph
);
void
tColDataClear
(
void
*
ph
);
int32_t
tColDataAppendValue
(
SColData
*
pColData
,
SColVal
*
pColVal
);
int32_t
tColDataAppendValue
(
SColData
*
pColData
,
SColVal
*
pColVal
);
int32_t
tColDataCopy
(
SColData
*
pColDataSrc
,
SColData
*
pColDataDest
);
int32_t
tColDataGetValue
(
SColData
*
pColData
,
int32_t
iRow
,
SColVal
*
pColVal
);
int32_t
tColDataGetValue
(
SColData
*
pColData
,
int32_t
iRow
,
SColVal
*
pColVal
);
int32_t
tColDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
tColDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
// SBlockData
// SBlockData
...
@@ -140,7 +141,10 @@ int32_t tColDataPCmprFn(const void *p1, const void *p2);
...
@@ -140,7 +141,10 @@ int32_t tColDataPCmprFn(const void *p1, const void *p2);
int32_t
tBlockDataInit
(
SBlockData
*
pBlockData
);
int32_t
tBlockDataInit
(
SBlockData
*
pBlockData
);
void
tBlockDataReset
(
SBlockData
*
pBlockData
);
void
tBlockDataReset
(
SBlockData
*
pBlockData
);
void
tBlockDataClear
(
SBlockData
*
pBlockData
);
void
tBlockDataClear
(
SBlockData
*
pBlockData
);
int32_t
tBlockDataAddColData
(
SBlockData
*
pBlockData
,
int32_t
iColData
,
SColData
**
ppColData
);
int32_t
tBlockDataAppendRow
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
);
int32_t
tBlockDataAppendRow
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
);
int32_t
tBlockDataMerge
(
SBlockData
*
pBlockData1
,
SBlockData
*
pBlockData2
,
SBlockData
*
pBlockData
);
int32_t
tBlockDataCopy
(
SBlockData
*
pBlockDataSrc
,
SBlockData
*
pBlockDataDest
);
// SDelIdx
// SDelIdx
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tPutDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelIdx
(
uint8_t
*
p
,
void
*
ph
);
int32_t
tGetDelIdx
(
uint8_t
*
p
,
void
*
ph
);
...
@@ -179,6 +183,9 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
...
@@ -179,6 +183,9 @@ bool tsdbTbDataIterNext(STbDataIter *pIter);
// tsdbFile.c ==============================================================================================
// tsdbFile.c ==============================================================================================
typedef
enum
{
TSDB_HEAD_FILE
=
0
,
TSDB_DATA_FILE
,
TSDB_LAST_FILE
,
TSDB_SMA_FILE
}
EDataFileT
;
typedef
enum
{
TSDB_HEAD_FILE
=
0
,
TSDB_DATA_FILE
,
TSDB_LAST_FILE
,
TSDB_SMA_FILE
}
EDataFileT
;
void
tsdbDataFileName
(
STsdb
*
pTsdb
,
SDFileSet
*
pDFileSet
,
EDataFileT
ftype
,
char
fname
[]);
void
tsdbDataFileName
(
STsdb
*
pTsdb
,
SDFileSet
*
pDFileSet
,
EDataFileT
ftype
,
char
fname
[]);
bool
tsdbFileIsSame
(
SDFileSet
*
pDFileSet1
,
SDFileSet
*
pDFileSet2
,
EDataFileT
ftype
);
int32_t
tsdbUpdateDFileHdr
(
TdFilePtr
pFD
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tsdbDFileRollback
(
STsdb
*
pTsdb
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tPutDataFileHdr
(
uint8_t
*
p
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tPutDataFileHdr
(
uint8_t
*
p
,
SDFileSet
*
pSet
,
EDataFileT
ftype
);
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
);
int32_t
tPutDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
);
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
);
int32_t
tGetDelFile
(
uint8_t
*
p
,
SDelFile
*
pDelFile
);
...
@@ -201,7 +208,7 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
...
@@ -201,7 +208,7 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid);
// SDataFWriter
// SDataFWriter
int32_t
tsdbDataFWriterOpen
(
SDataFWriter
**
ppWriter
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
);
int32_t
tsdbDataFWriterOpen
(
SDataFWriter
**
ppWriter
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
);
int32_t
tsdbDataFWriterClose
(
SDataFWriter
**
ppWriter
,
int8_t
sync
);
int32_t
tsdbDataFWriterClose
(
SDataFWriter
**
ppWriter
,
int8_t
sync
);
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
,
uint8_t
**
ppBuf
);
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
);
int32_t
tsdbWriteBlockIdx
(
SDataFWriter
*
pWriter
,
SMapData
*
pMapData
,
uint8_t
**
ppBuf
);
int32_t
tsdbWriteBlockIdx
(
SDataFWriter
*
pWriter
,
SMapData
*
pMapData
,
uint8_t
**
ppBuf
);
int32_t
tsdbWriteBlock
(
SDataFWriter
*
pWriter
,
SMapData
*
pMapData
,
uint8_t
**
ppBuf
,
SBlockIdx
*
pBlockIdx
);
int32_t
tsdbWriteBlock
(
SDataFWriter
*
pWriter
,
SMapData
*
pMapData
,
uint8_t
**
ppBuf
,
SBlockIdx
*
pBlockIdx
);
int32_t
tsdbWriteBlockData
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
,
int32_t
tsdbWriteBlockData
(
SDataFWriter
*
pWriter
,
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
,
...
@@ -364,15 +371,18 @@ typedef struct {
...
@@ -364,15 +371,18 @@ typedef struct {
int8_t
type
;
int8_t
type
;
int8_t
flag
;
// HAS_NONE|HAS_NULL|HAS_VALUE
int8_t
flag
;
// HAS_NONE|HAS_NULL|HAS_VALUE
int64_t
offset
;
int64_t
offset
;
int64_t
size
;
int64_t
bsize
;
// bitmap size
int64_t
csize
;
// compressed column value size
int64_t
osize
;
// original column value size (only save for variant data type)
}
SBlockCol
;
}
SBlockCol
;
typedef
struct
{
typedef
struct
{
int64_t
nRow
;
int64_t
nRow
;
int8_t
cmprAlg
;
int8_t
cmprAlg
;
int64_t
offset
;
int64_t
offset
;
int64_t
ksize
;
int64_t
vsize
;
// VERSION size
int64_t
bsize
;
int64_t
ksize
;
// TSKEY size
int64_t
bsize
;
// total block size
SMapData
mBlockCol
;
// SMapData<SBlockCol>
SMapData
mBlockCol
;
// SMapData<SBlockCol>
}
SSubBlock
;
}
SSubBlock
;
...
@@ -408,7 +418,6 @@ struct SColData {
...
@@ -408,7 +418,6 @@ struct SColData {
int32_t
*
aOffset
;
int32_t
*
aOffset
;
int32_t
nData
;
int32_t
nData
;
uint8_t
*
pData
;
uint8_t
*
pData
;
uint8_t
*
pBuf
;
};
};
struct
SBlockData
{
struct
SBlockData
{
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
ed6276bd
...
@@ -782,7 +782,13 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx)
...
@@ -782,7 +782,13 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx)
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
tBlockReset
(
pBlockN
);
tBlockReset
(
pBlockN
);
pBlockN
->
last
=
1
;
pBlockN
->
minKey
=
pBlockO
->
minKey
;
pBlockN
->
maxKey
=
pBlockO
->
maxKey
;
pBlockN
->
minVersion
=
pBlockO
->
minVersion
;
pBlockN
->
maxVersion
=
pBlockO
->
maxVersion
;
pBlockN
->
nRow
=
pBlockO
->
nRow
;
pBlockN
->
last
=
pBlockO
->
last
;
pBlockN
->
hasDup
=
pBlockO
->
hasDup
;
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockDataO
,
NULL
,
NULL
,
pBlockIdx
,
pBlockN
,
pCommitter
->
cmprAlg
);
code
=
tsdbWriteBlockData
(
pCommitter
->
pWriter
,
pBlockDataO
,
NULL
,
NULL
,
pBlockIdx
,
pBlockN
,
pCommitter
->
cmprAlg
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
...
@@ -964,7 +970,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
...
@@ -964,7 +970,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// update file header
// update file header
code
=
tsdbUpdateDFileSetHeader
(
pCommitter
->
pWriter
,
NULL
);
code
=
tsdbUpdateDFileSetHeader
(
pCommitter
->
pWriter
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
// upsert SDFileSet
// upsert SDFileSet
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
ed6276bd
...
@@ -171,7 +171,71 @@ _err:
...
@@ -171,7 +171,71 @@ _err:
static
int32_t
tsdbApplyDFileSetChange
(
STsdbFS
*
pFS
,
SDFileSet
*
pFrom
,
SDFileSet
*
pTo
)
{
static
int32_t
tsdbApplyDFileSetChange
(
STsdbFS
*
pFS
,
SDFileSet
*
pFrom
,
SDFileSet
*
pTo
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
// TODO
char
fname
[
TSDB_FILENAME_LEN
];
if
(
pFrom
&&
pTo
)
{
// head
if
(
tsdbFileIsSame
(
pFrom
,
pTo
,
TSDB_HEAD_FILE
))
{
ASSERT
(
0
);
}
else
{
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_HEAD_FILE
,
fname
);
taosRemoveFile
(
fname
);
}
// data
if
(
tsdbFileIsSame
(
pFrom
,
pTo
,
TSDB_DATA_FILE
))
{
if
(
pFrom
->
fData
.
size
>
pTo
->
fData
.
size
)
{
code
=
tsdbDFileRollback
(
pFS
->
pTsdb
,
pTo
,
TSDB_DATA_FILE
);
if
(
code
)
goto
_err
;
}
}
else
{
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_DATA_FILE
,
fname
);
taosRemoveFile
(
fname
);
}
// last
if
(
tsdbFileIsSame
(
pFrom
,
pTo
,
TSDB_LAST_FILE
))
{
if
(
pFrom
->
fLast
.
size
>
pTo
->
fLast
.
size
)
{
code
=
tsdbDFileRollback
(
pFS
->
pTsdb
,
pTo
,
TSDB_LAST_FILE
);
if
(
code
)
goto
_err
;
}
}
else
{
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_LAST_FILE
,
fname
);
taosRemoveFile
(
fname
);
}
// sma
if
(
tsdbFileIsSame
(
pFrom
,
pTo
,
TSDB_SMA_FILE
))
{
if
(
pFrom
->
fSma
.
size
>
pTo
->
fSma
.
size
)
{
code
=
tsdbDFileRollback
(
pFS
->
pTsdb
,
pTo
,
TSDB_SMA_FILE
);
if
(
code
)
goto
_err
;
}
}
else
{
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_SMA_FILE
,
fname
);
taosRemoveFile
(
fname
);
}
}
else
if
(
pFrom
)
{
// head
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_HEAD_FILE
,
fname
);
taosRemoveFile
(
fname
);
// data
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_DATA_FILE
,
fname
);
taosRemoveFile
(
fname
);
// last
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_LAST_FILE
,
fname
);
taosRemoveFile
(
fname
);
// fsm
tsdbDataFileName
(
pFS
->
pTsdb
,
pFrom
,
TSDB_SMA_FILE
,
fname
);
taosRemoveFile
(
fname
);
}
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb apply disk file set change failed since %s"
,
TD_VID
(
pFS
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
return
code
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
ed6276bd
...
@@ -120,6 +120,107 @@ void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char
...
@@ -120,6 +120,107 @@ void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char
}
}
}
}
bool
tsdbFileIsSame
(
SDFileSet
*
pDFileSet1
,
SDFileSet
*
pDFileSet2
,
EDataFileT
ftype
)
{
if
(
pDFileSet1
->
diskId
.
level
!=
pDFileSet2
->
diskId
.
level
||
pDFileSet1
->
diskId
.
id
!=
pDFileSet2
->
diskId
.
id
)
{
return
false
;
}
switch
(
ftype
)
{
case
TSDB_HEAD_FILE
:
return
pDFileSet1
->
fHead
.
commitID
==
pDFileSet2
->
fHead
.
commitID
;
case
TSDB_DATA_FILE
:
return
pDFileSet1
->
fData
.
commitID
==
pDFileSet2
->
fData
.
commitID
;
case
TSDB_LAST_FILE
:
return
pDFileSet1
->
fLast
.
commitID
==
pDFileSet2
->
fLast
.
commitID
;
case
TSDB_SMA_FILE
:
return
pDFileSet1
->
fSma
.
commitID
==
pDFileSet2
->
fSma
.
commitID
;
default:
ASSERT
(
0
);
break
;
}
}
int32_t
tsdbUpdateDFileHdr
(
TdFilePtr
pFD
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
code
=
0
;
int64_t
n
;
char
hdr
[
TSDB_FHDR_SIZE
];
memset
(
hdr
,
0
,
TSDB_FHDR_SIZE
);
tPutDataFileHdr
(
hdr
,
pSet
,
ftype
);
taosCalcChecksumAppend
(
0
,
hdr
,
TSDB_FHDR_SIZE
);
n
=
taosLSeekFile
(
pFD
,
0
,
SEEK_SET
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_exit
;
}
n
=
taosWriteFile
(
pFD
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_exit
;
}
_exit:
return
code
;
}
int32_t
tsdbDFileRollback
(
STsdb
*
pTsdb
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
code
=
0
;
int64_t
size
;
TdFilePtr
pFD
;
char
fname
[
TSDB_FILENAME_LEN
];
tsdbDataFileName
(
pTsdb
,
pSet
,
ftype
,
fname
);
// open
pFD
=
taosOpenFile
(
fname
,
TD_FILE_WRITE
);
if
(
pFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// truncate
switch
(
ftype
)
{
case
TSDB_HEAD_FILE
:
size
=
pSet
->
fHead
.
size
;
break
;
case
TSDB_DATA_FILE
:
size
=
pSet
->
fData
.
size
;
break
;
case
TSDB_LAST_FILE
:
size
=
pSet
->
fLast
.
size
;
break
;
case
TSDB_SMA_FILE
:
size
=
pSet
->
fSma
.
size
;
break
;
default:
ASSERT
(
0
);
}
if
(
taosFtruncateFile
(
pFD
,
size
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// update header
code
=
tsdbUpdateDFileHdr
(
pFD
,
pSet
,
ftype
);
if
(
code
)
goto
_err
;
// sync
if
(
taosFsyncFile
(
pFD
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// close
taosCloseFile
(
&
pFD
);
return
code
;
_err:
return
code
;
}
int32_t
tPutDataFileHdr
(
uint8_t
*
p
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
tPutDataFileHdr
(
uint8_t
*
p
,
SDFileSet
*
pSet
,
EDataFileT
ftype
)
{
int32_t
n
=
0
;
int32_t
n
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
ed6276bd
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
ed6276bd
...
@@ -599,93 +599,234 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
...
@@ -599,93 +599,234 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl
return
code
;
return
code
;
}
}
int32_t
tsdbReadBlockData
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
static
int32_t
tsdbReadSubBlockData
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
int32_t
iSubBlock
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
)
{
SBlockData
*
pBlockData
,
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
uint8_t
*
p
;
int64_t
size
;
int64_t
n
;
TdFilePtr
pFD
=
pBlock
->
last
?
pReader
->
pLastFD
:
pReader
->
pDataFD
;
TdFilePtr
pFD
=
pBlock
->
last
?
pReader
->
pLastFD
:
pReader
->
pDataFD
;
uint8_t
*
pBuf1
=
NULL
;
SSubBlock
*
pSubBlock
=
&
pBlock
->
aSubBlock
[
iSubBlock
];
uint8_t
*
pBuf2
=
NULL
;
SBlockCol
*
pBlockCol
=
&
(
SBlockCol
){
0
};
SBlockCol
*
pBlockCol
=
&
(
SBlockCol
){};
if
(
!
ppBuf1
)
ppBuf1
=
&
pBuf1
;
tBlockDataReset
(
pBlockData
);
if
(
!
ppBuf2
)
ppBuf2
=
&
pBuf2
;
for
(
int32_t
iSubBlock
=
0
;
iSubBlock
<
pBlock
->
nSubBlock
;
iSubBlock
++
)
{
// realloc
SSubBlock
*
pSubBlock
=
&
pBlock
->
aSubBlock
[
iSubBlock
];
code
=
tsdbRealloc
(
ppBuf1
,
pSubBlock
->
bsize
);
uint8_t
*
p
;
if
(
code
)
goto
_err
;
int64_t
n
;
// realloc
// seek
code
=
tsdbRealloc
(
ppBuf1
,
pSubBlock
->
bsize
);
n
=
taosLSeekFile
(
pFD
,
pSubBlock
->
offset
,
SEEK_SET
);
if
(
code
)
goto
_err
;
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// seek
// read
n
=
taosLSeekFile
(
pFD
,
pSubBlock
->
offset
,
SEEK_SET
);
n
=
taosReadFile
(
pFD
,
*
ppBuf1
,
pSubBlock
->
bsize
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
else
if
(
n
<
pSubBlock
->
bsize
)
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
// check
p
=
*
ppBuf1
;
SBlockDataHdr
*
pHdr
=
(
SBlockDataHdr
*
)
p
;
ASSERT
(
pHdr
->
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pHdr
->
suid
==
pBlockIdx
->
suid
);
ASSERT
(
pHdr
->
uid
==
pBlockIdx
->
uid
);
p
+=
sizeof
(
*
pHdr
);
if
(
!
taosCheckChecksumWhole
(
p
,
pSubBlock
->
vsize
+
pSubBlock
->
ksize
+
sizeof
(
TSCKSUM
)))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
}
p
+=
(
pSubBlock
->
vsize
+
pSubBlock
->
ksize
+
sizeof
(
TSCKSUM
));
for
(
int32_t
iBlockCol
=
0
;
iBlockCol
<
pSubBlock
->
mBlockCol
.
nItem
;
iBlockCol
++
)
{
tMapDataGetItemByIdx
(
&
pSubBlock
->
mBlockCol
,
iBlockCol
,
pBlockCol
,
tGetBlockCol
);
ASSERT
(
pBlockCol
->
flag
&&
pBlockCol
->
flag
!=
HAS_NONE
);
if
(
pBlockCol
->
flag
==
HAS_NULL
)
continue
;
if
(
!
taosCheckChecksumWhole
(
p
,
pBlockCol
->
bsize
+
pBlockCol
->
csize
+
sizeof
(
TSCKSUM
)))
{
code
=
TSDB_CODE_FILE_CORRUPTED
;
goto
_err
;
goto
_err
;
}
}
p
=
p
+
pBlockCol
->
bsize
+
pBlockCol
->
csize
+
sizeof
(
TSCKSUM
);
}
// recover
pBlockData
->
nRow
=
pSubBlock
->
nRow
;
p
=
*
ppBuf1
+
sizeof
(
*
pHdr
);
code
=
tsdbRealloc
((
uint8_t
**
)
&
pBlockData
->
aVersion
,
pBlockData
->
nRow
*
sizeof
(
int64_t
));
if
(
code
)
goto
_err
;
code
=
tsdbRealloc
((
uint8_t
**
)
&
pBlockData
->
aTSKEY
,
pBlockData
->
nRow
*
sizeof
(
TSKEY
));
if
(
code
)
goto
_err
;
if
(
pSubBlock
->
cmprAlg
==
NO_COMPRESSION
)
{
ASSERT
(
pSubBlock
->
vsize
==
sizeof
(
int64_t
)
*
pSubBlock
->
nRow
);
ASSERT
(
pSubBlock
->
ksize
==
sizeof
(
TSKEY
)
*
pSubBlock
->
nRow
);
// VERSION
memcpy
(
pBlockData
->
aVersion
,
p
,
pSubBlock
->
vsize
);
// TSKEY
memcpy
(
pBlockData
->
aTSKEY
,
p
+
pSubBlock
->
vsize
,
pSubBlock
->
ksize
);
}
else
{
size
=
sizeof
(
int64_t
)
*
pSubBlock
->
nRow
+
COMP_OVERFLOW_BYTES
;
if
(
pSubBlock
->
cmprAlg
==
TWO_STAGE_COMP
)
{
code
=
tsdbRealloc
(
ppBuf2
,
size
);
if
(
code
)
goto
_err
;
}
// read
// VERSION
n
=
taosReadFile
(
pFD
,
*
ppBuf1
,
pSubBlock
->
bsize
);
n
=
tsDecompressBigint
(
p
,
pSubBlock
->
vsize
,
pSubBlock
->
nRow
,
(
char
*
)
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
pSubBlock
->
nRow
,
pSubBlock
->
cmprAlg
,
*
ppBuf2
,
size
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
T
AOS_SYSTEM_ERROR
(
errno
)
;
code
=
T
SDB_CODE_COMPRESS_ERROR
;
goto
_err
;
goto
_err
;
}
else
if
(
n
<
pSubBlock
->
bsize
)
{
}
code
=
TSDB_CODE_FILE_CORRUPTED
;
// TSKEY
n
=
tsDecompressTimestamp
(
p
+
pSubBlock
->
vsize
,
pSubBlock
->
ksize
,
pSubBlock
->
nRow
,
(
char
*
)
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
pSubBlock
->
nRow
,
pSubBlock
->
cmprAlg
,
*
ppBuf2
,
size
);
if
(
n
<
0
)
{
code
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_err
;
goto
_err
;
}
}
}
p
=
p
+
pSubBlock
->
vsize
+
pSubBlock
->
ksize
+
sizeof
(
TSCKSUM
);
// check
for
(
int32_t
iBlockCol
=
0
;
iBlockCol
<
pSubBlock
->
mBlockCol
.
nItem
;
iBlockCol
++
)
{
p
=
*
ppBuf1
;
SColData
*
pColData
;
SBlockDataHdr
*
pHdr
=
(
SBlockDataHdr
*
)
p
;
ASSERT
(
pHdr
->
delimiter
==
TSDB_FILE_DLMT
);
ASSERT
(
pHdr
->
suid
==
pBlockIdx
->
suid
);
ASSERT
(
pHdr
->
uid
==
pBlockIdx
->
uid
);
p
+=
sizeof
(
*
pHdr
);
if
(
!
taosCheckChecksumWhole
(
p
,
pSubBlock
->
ksize
))
{
tMapDataGetItemByIdx
(
&
pSubBlock
->
mBlockCol
,
iBlockCol
,
pBlockCol
,
tGetBlockCol
);
code
=
TSDB_CODE_FILE_CORRUPTED
;
ASSERT
(
pBlockCol
->
flag
&&
pBlockCol
->
flag
!=
HAS_NONE
);
goto
_err
;
code
=
tBlockDataAddColData
(
pBlockData
,
iBlockCol
,
&
pColData
);
if
(
code
)
goto
_err
;
tColDataReset
(
pColData
,
pBlockCol
->
cid
,
pBlockCol
->
type
);
if
(
pBlockCol
->
flag
==
HAS_NULL
)
{
for
(
int32_t
iRow
=
0
;
iRow
<
pSubBlock
->
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NULL
(
pBlockCol
->
cid
,
pBlockCol
->
type
));
if
(
code
)
goto
_err
;
}
continue
;
}
}
p
+=
pSubBlock
->
ksize
;
pColData
->
nVal
=
pSubBlock
->
nRow
;
pColData
->
flag
=
pBlockCol
->
flag
;
for
(
int32_t
iBlockCol
=
0
;
iBlockCol
<
pSubBlock
->
mBlockCol
.
nItem
;
iBlockCol
++
)
{
// bitmap
tMapDataGetItemByIdx
(
&
pSubBlock
->
mBlockCol
,
iBlockCol
,
pBlockCol
,
tGetBlockCol
);
if
(
pBlockCol
->
flag
!=
HAS_VALUE
)
{
size
=
BIT2_SIZE
(
pSubBlock
->
nRow
);
code
=
tsdbRealloc
(
&
pColData
->
pBitMap
,
size
);
if
(
code
)
goto
_err
;
ASSERT
(
pBlockCol
->
flag
&&
pBlockCol
->
flag
!=
HAS_NONE
);
ASSERT
(
pBlockCol
->
bsize
==
size
);
memcpy
(
pColData
->
pBitMap
,
p
,
size
);
}
else
{
ASSERT
(
pBlockCol
->
bsize
==
0
);
}
p
=
p
+
pBlockCol
->
bsize
;
// value
if
(
IS_VAR_DATA_TYPE
(
pBlockCol
->
type
))
{
pColData
->
nData
=
pBlockCol
->
osize
;
}
else
{
pColData
->
nData
=
tDataTypes
[
pBlockCol
->
type
].
bytes
*
pSubBlock
->
nRow
;
}
code
=
tsdbRealloc
(
&
pColData
->
pData
,
pColData
->
nData
);
if
(
code
)
goto
_err
;
if
(
pBlockCol
->
flag
==
HAS_NULL
)
continue
;
if
(
pSubBlock
->
cmprAlg
==
NO_COMPRESSION
)
{
memcpy
(
pColData
->
pData
,
p
,
pColData
->
nData
);
}
else
{
size
=
pColData
->
nData
+
COMP_OVERFLOW_BYTES
;
if
(
pSubBlock
->
cmprAlg
==
TWO_STAGE_COMP
)
{
code
=
tsdbRealloc
(
ppBuf2
,
size
);
if
(
code
)
goto
_err
;
}
if
(
!
taosCheckChecksumWhole
(
p
,
pBlockCol
->
size
))
{
n
=
tDataTypes
[
pBlockCol
->
type
].
decompFunc
(
p
,
pBlockCol
->
csize
,
pSubBlock
->
nRow
,
pColData
->
pData
,
pColData
->
nData
,
code
=
TSDB_CODE_FILE_CORRUPTED
;
pSubBlock
->
cmprAlg
,
*
ppBuf2
,
size
);
if
(
n
<
0
)
{
code
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_err
;
goto
_err
;
}
}
p
+=
pBlockCol
->
size
;
ASSERT
(
n
==
pColData
->
nData
);
}
}
p
=
p
+
pBlockCol
->
csize
+
sizeof
(
TSCKSUM
);
}
// recover
// TODO
pBlockData
->
nRow
=
pSubBlock
->
nRow
;
return
code
;
p
=
*
ppBuf1
+
sizeof
(
*
pHdr
);
code
=
tsdbRealloc
((
uint8_t
**
)
&
pBlockData
->
aVersion
,
pBlockData
->
nRow
*
sizeof
(
int64_t
));
_err:
if
(
code
)
goto
_err
;
tsdbError
(
"vgId:%d tsdb read sub block data failed since %s"
,
TD_VID
(
pReader
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
code
=
tsdbRealloc
((
uint8_t
**
)
&
pBlockData
->
aTSKEY
,
pBlockData
->
nRow
*
sizeof
(
TSKEY
));
return
code
;
if
(
code
)
goto
_err
;
}
p
+=
pSubBlock
->
ksize
;
for
(
int32_t
iBlockCol
=
0
;
iBlockCol
<
pSubBlock
->
mBlockCol
.
nItem
;
iBlockCol
++
)
{
int32_t
tsdbReadBlockData
(
SDataFReader
*
pReader
,
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
SBlockData
*
pBlockData
,
tMapDataGetItemByIdx
(
&
pSubBlock
->
mBlockCol
,
iBlockCol
,
pBlockCol
,
tGetBlockCol
);
uint8_t
**
ppBuf1
,
uint8_t
**
ppBuf2
)
{
int32_t
code
=
0
;
TdFilePtr
pFD
=
pBlock
->
last
?
pReader
->
pLastFD
:
pReader
->
pDataFD
;
uint8_t
*
pBuf1
=
NULL
;
uint8_t
*
pBuf2
=
NULL
;
int32_t
iSubBlock
;
if
(
pBlockCol
->
flag
==
HAS_NONE
)
{
if
(
!
ppBuf1
)
ppBuf1
=
&
pBuf1
;
// All NULL value
if
(
!
ppBuf2
)
ppBuf2
=
&
pBuf2
;
}
else
{
// decompress
// read the first sub-block
p
+=
pBlockCol
->
size
;
iSubBlock
=
0
;
code
=
tsdbReadSubBlockData
(
pReader
,
pBlockIdx
,
pBlock
,
iSubBlock
,
pBlockData
,
ppBuf1
,
ppBuf2
);
if
(
code
)
goto
_err
;
// read remain block data and do merg
if
(
pBlock
->
nSubBlock
>
1
)
{
SBlockData
*
pBlockData1
=
&
(
SBlockData
){
0
};
SBlockData
*
pBlockData2
=
&
(
SBlockData
){
0
};
for
(
iSubBlock
=
1
;
iSubBlock
<
pBlock
->
nSubBlock
;
iSubBlock
++
)
{
code
=
tsdbReadSubBlockData
(
pReader
,
pBlockIdx
,
pBlock
,
iSubBlock
,
pBlockData
,
ppBuf1
,
ppBuf2
);
if
(
code
)
{
tBlockDataClear
(
pBlockData1
);
tBlockDataClear
(
pBlockData2
);
goto
_err
;
}
code
=
tBlockDataCopy
(
pBlockData
,
pBlockData2
);
if
(
code
)
{
tBlockDataClear
(
pBlockData1
);
tBlockDataClear
(
pBlockData2
);
goto
_err
;
}
// merge two block data
code
=
tBlockDataMerge
(
pBlockData1
,
pBlockData2
,
pBlockData
);
if
(
code
)
{
tBlockDataClear
(
pBlockData1
);
tBlockDataClear
(
pBlockData2
);
goto
_err
;
}
}
}
}
tBlockDataClear
(
pBlockData1
);
tBlockDataClear
(
pBlockData2
);
}
}
ASSERT
(
pBlock
->
nRow
==
pBlockData
->
nRow
);
ASSERT
(
tsdbKeyCmprFn
(
&
pBlock
->
minKey
,
&
TSDBROW_KEY
(
&
tBlockDataFirstRow
(
pBlockData
)))
==
0
);
ASSERT
(
tsdbKeyCmprFn
(
&
pBlock
->
maxKey
,
&
TSDBROW_KEY
(
&
tBlockDataLastRow
(
pBlockData
)))
==
0
);
if
(
pBuf1
)
tsdbFree
(
pBuf1
);
if
(
pBuf1
)
tsdbFree
(
pBuf1
);
if
(
pBuf2
)
tsdbFree
(
pBuf2
);
if
(
pBuf2
)
tsdbFree
(
pBuf2
);
return
code
;
return
code
;
...
@@ -906,99 +1047,35 @@ _err:
...
@@ -906,99 +1047,35 @@ _err:
return
code
;
return
code
;
}
}
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
,
uint8_t
**
ppBuf
)
{
int32_t
tsdbUpdateDFileSetHeader
(
SDataFWriter
*
pWriter
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
int64_t
size
=
TSDB_FHDR_SIZE
;
int64_t
size
=
TSDB_FHDR_SIZE
;
int64_t
n
;
int64_t
n
;
uint8_t
*
pBuf
=
NULL
;
uint8_t
hdr
[
TSDB_FHDR_SIZE
]
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
wSet
.
fHead
;
SHeadFile
*
pHeadFile
=
&
pWriter
->
wSet
.
fHead
;
SDataFile
*
pDataFile
=
&
pWriter
->
wSet
.
fData
;
SDataFile
*
pDataFile
=
&
pWriter
->
wSet
.
fData
;
SLastFile
*
pLastFile
=
&
pWriter
->
wSet
.
fLast
;
SLastFile
*
pLastFile
=
&
pWriter
->
wSet
.
fLast
;
SSmaFile
*
pSmaFile
=
&
pWriter
->
wSet
.
fSma
;
SSmaFile
*
pSmaFile
=
&
pWriter
->
wSet
.
fSma
;
// alloc
if
(
!
ppBuf
)
ppBuf
=
&
pBuf
;
code
=
tsdbRealloc
(
ppBuf
,
size
);
if
(
code
)
goto
_err
;
// head ==============
// head ==============
// build
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_HEAD_FILE
);
memset
(
*
ppBuf
,
0
,
size
);
if
(
code
)
goto
_err
;
tPutDataFileHdr
(
*
ppBuf
,
&
pWriter
->
wSet
,
TSDB_HEAD_FILE
);
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
// seek
if
(
taosLSeekFile
(
pWriter
->
pHeadFD
,
0
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// write
n
=
taosWriteFile
(
pWriter
->
pHeadFD
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// data ==============
// data ==============
memset
(
*
ppBuf
,
0
,
size
);
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_DATA_FILE
);
tPutDataFileHdr
(
*
ppBuf
,
&
pWriter
->
wSet
,
TSDB_DATA_FILE
);
if
(
code
)
goto
_err
;
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
// seek
if
(
taosLSeekFile
(
pWriter
->
pDataFD
,
0
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// write
n
=
taosWriteFile
(
pWriter
->
pDataFD
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// last ==============
// last ==============
memset
(
*
ppBuf
,
0
,
size
);
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_LAST_FILE
);
tPutDataFileHdr
(
*
ppBuf
,
&
pWriter
->
wSet
,
TSDB_LAST_FILE
);
if
(
code
)
goto
_err
;
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
// seek
if
(
taosLSeekFile
(
pWriter
->
pLastFD
,
0
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// write
n
=
taosWriteFile
(
pWriter
->
pLastFD
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// sma ==============
// sma ==============
memset
(
*
ppBuf
,
0
,
size
);
code
=
tsdbUpdateDFileHdr
(
pWriter
->
pHeadFD
,
&
pWriter
->
wSet
,
TSDB_SMA_FILE
);
tPutDataFileHdr
(
*
ppBuf
,
&
pWriter
->
wSet
,
TSDB_SMA_FILE
);
if
(
code
)
goto
_err
;
taosCalcChecksumAppend
(
0
,
*
ppBuf
,
size
);
// seek
if
(
taosLSeekFile
(
pWriter
->
pSmaFD
,
0
,
SEEK_SET
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
// write
n
=
taosWriteFile
(
pWriter
->
pSmaFD
,
*
ppBuf
,
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
tsdbFree
(
pBuf
);
return
code
;
return
code
;
_err:
_err:
tsdbFree
(
pBuf
);
tsdbError
(
"vgId:%d update DFileSet header failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d update DFileSet header failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
return
code
;
}
}
...
@@ -1134,27 +1211,26 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1134,27 +1211,26 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
pSubBlock
->
bsize
+=
n
;
pSubBlock
->
bsize
+=
n
;
// TSDBKEY
// TSDBKEY
pSubBlock
->
ksize
=
0
;
if
(
cmprAlg
==
NO_COMPRESSION
)
{
if
(
cmprAlg
==
NO_COMPRESSION
)
{
// TSKEY
cksm
=
0
;
size
=
sizeof
(
TSKEY
)
*
pBlockData
->
nRow
;
n
=
taosWriteFile
(
pFileFD
,
pBlockData
->
aTSKEY
,
size
);
// version
pSubBlock
->
vsize
=
sizeof
(
int64_t
)
*
pBlockData
->
nRow
;
n
=
taosWriteFile
(
pFileFD
,
pBlockData
->
aVersion
,
pSubBlock
->
vsize
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pSubBlock
->
ksize
+=
size
;
cksm
=
taosCalcChecksum
(
cksm
,
(
uint8_t
*
)
pBlockData
->
aVersion
,
pSubBlock
->
vsize
);
cksm
=
taosCalcChecksum
(
0
,
(
uint8_t
*
)
pBlockData
->
aTSKEY
,
size
);
//
version
//
TSKEY
size
=
sizeof
(
int64_t
)
*
pBlockData
->
nRow
;
pSubBlock
->
ksize
=
sizeof
(
TSKEY
)
*
pBlockData
->
nRow
;
n
=
taosWriteFile
(
pFileFD
,
pBlockData
->
a
Version
,
size
);
n
=
taosWriteFile
(
pFileFD
,
pBlockData
->
a
TSKEY
,
pSubBlock
->
k
size
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pSubBlock
->
ksize
+=
size
;
cksm
=
taosCalcChecksum
(
cksm
,
(
uint8_t
*
)
pBlockData
->
aTSKEY
,
pSubBlock
->
ksize
);
cksm
=
taosCalcChecksum
(
cksm
,
(
uint8_t
*
)
pBlockData
->
aVersion
,
size
);
// cksm
// cksm
size
=
sizeof
(
cksm
);
size
=
sizeof
(
cksm
);
...
@@ -1163,11 +1239,10 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1163,11 +1239,10 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pSubBlock
->
ksize
+=
size
;
}
else
{
}
else
{
ASSERT
(
cmprAlg
==
ONE_STAGE_COMP
||
cmprAlg
==
TWO_STAGE_COMP
);
ASSERT
(
cmprAlg
==
ONE_STAGE_COMP
||
cmprAlg
==
TWO_STAGE_COMP
);
size
=
(
sizeof
(
TSKEY
)
+
sizeof
(
int64_t
))
*
pBlockData
->
nRow
+
COMP_OVERFLOW_BYTES
*
2
+
sizeof
(
TSCKSUM
);
size
=
(
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
))
*
pBlockData
->
nRow
+
COMP_OVERFLOW_BYTES
*
2
+
sizeof
(
TSCKSUM
);
code
=
tsdbRealloc
(
ppBuf1
,
size
);
code
=
tsdbRealloc
(
ppBuf1
,
size
);
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
...
@@ -1177,37 +1252,37 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1177,37 +1252,37 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
}
//
TSKEY
//
version
n
=
tsCompress
Timestamp
((
char
*
)
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
pBlockData
->
nRow
,
pBlockData
->
nRow
,
*
ppBuf1
,
n
=
tsCompress
Bigint
((
char
*
)
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
pBlockData
->
nRow
,
pBlockData
->
nRow
,
*
ppBuf1
,
size
,
cmprAlg
,
*
ppBuf2
,
size
);
size
,
cmprAlg
,
*
ppBuf2
,
size
);
if
(
n
<=
0
)
{
if
(
n
<=
0
)
{
code
=
TSDB_CODE_COMPRESS_ERROR
;
code
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_err
;
goto
_err
;
}
}
pSubBlock
->
ksize
+
=
n
;
pSubBlock
->
vsize
=
n
;
//
version
//
TSKEY
n
=
tsCompress
Bigint
((
char
*
)
pBlockData
->
aVersion
,
sizeof
(
int64_t
)
*
pBlockData
->
nRow
,
pBlockData
->
nRow
,
n
=
tsCompress
Timestamp
((
char
*
)
pBlockData
->
aTSKEY
,
sizeof
(
TSKEY
)
*
pBlockData
->
nRow
,
pBlockData
->
nRow
,
*
ppBuf1
+
pSubBlock
->
ksize
,
size
-
pSubBlock
->
k
size
,
cmprAlg
,
*
ppBuf2
,
size
);
*
ppBuf1
+
pSubBlock
->
vsize
,
size
-
pSubBlock
->
v
size
,
cmprAlg
,
*
ppBuf2
,
size
);
if
(
n
<=
0
)
{
if
(
n
<=
0
)
{
code
=
TSDB_CODE_COMPRESS_ERROR
;
code
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_err
;
goto
_err
;
}
}
pSubBlock
->
ksize
+
=
n
;
pSubBlock
->
ksize
=
n
;
// cksm
// cksm
pSubBlock
->
ksize
+=
sizeof
(
TSCKSUM
);
n
=
pSubBlock
->
vsize
+
pSubBlock
->
ksize
+
sizeof
(
TSCKSUM
);
ASSERT
(
pSubBlock
->
ksize
<=
size
);
ASSERT
(
n
<=
size
);
taosCalcChecksumAppend
(
0
,
*
ppBuf1
,
pSubBlock
->
ksize
);
taosCalcChecksumAppend
(
0
,
*
ppBuf1
,
n
);
// write
// write
n
=
taosWriteFile
(
pFileFD
,
*
ppBuf1
,
pSubBlock
->
ksize
);
n
=
taosWriteFile
(
pFileFD
,
*
ppBuf1
,
n
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
}
}
pSubBlock
->
bsize
+=
pSubBlock
->
ksize
;
pSubBlock
->
bsize
+=
(
pSubBlock
->
vsize
+
pSubBlock
->
ksize
+
sizeof
(
TSCKSUM
))
;
// other columns
// other columns
offset
=
0
;
offset
=
0
;
...
@@ -1226,19 +1301,18 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1226,19 +1301,18 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
if
(
pColData
->
flag
!=
HAS_NULL
)
{
if
(
pColData
->
flag
!=
HAS_NULL
)
{
cksm
=
0
;
cksm
=
0
;
pBlockCol
->
offset
=
offset
;
pBlockCol
->
offset
=
offset
;
pBlockCol
->
size
=
0
;
// bitmap
// bitmap
if
(
pColData
->
flag
!=
HAS_VALUE
)
{
if
(
pColData
->
flag
==
HAS_VALUE
)
{
// optimize bitmap storage (todo)
pBlockCol
->
bsize
=
0
;
n
=
taosWriteFile
(
pFileFD
,
pColData
->
pBitMap
,
BIT2_SIZE
(
pBlockData
->
nRow
));
}
else
{
pBlockCol
->
bsize
=
BIT2_SIZE
(
pBlockData
->
nRow
);
n
=
taosWriteFile
(
pFileFD
,
pColData
->
pBitMap
,
pBlockCol
->
bsize
);
if
(
n
<
0
)
{
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
cksm
=
taosCalcChecksum
(
cksm
,
pColData
->
pBitMap
,
n
);
cksm
=
taosCalcChecksum
(
cksm
,
pColData
->
pBitMap
,
n
);
pBlockCol
->
size
+=
n
;
}
}
// data
// data
...
@@ -1249,7 +1323,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1249,7 +1323,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pBlockCol
->
size
+=
n
;
pBlockCol
->
csize
=
n
;
pBlockCol
->
osize
=
n
;
// checksum
// checksum
cksm
=
taosCalcChecksum
(
cksm
,
pColData
->
pData
,
pColData
->
nData
);
cksm
=
taosCalcChecksum
(
cksm
,
pColData
->
pData
,
pColData
->
nData
);
...
@@ -1258,7 +1333,6 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1258,7 +1333,6 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pBlockCol
->
size
+=
n
;
}
else
{
}
else
{
size
=
pColData
->
nData
+
COMP_OVERFLOW_BYTES
+
sizeof
(
TSCKSUM
);
size
=
pColData
->
nData
+
COMP_OVERFLOW_BYTES
+
sizeof
(
TSCKSUM
);
...
@@ -1277,6 +1351,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1277,6 +1351,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code
=
TSDB_CODE_COMPRESS_ERROR
;
code
=
TSDB_CODE_COMPRESS_ERROR
;
goto
_err
;
goto
_err
;
}
}
pBlockCol
->
csize
=
n
;
pBlockCol
->
osize
=
pColData
->
nData
;
// cksm
// cksm
n
+=
sizeof
(
TSCKSUM
);
n
+=
sizeof
(
TSCKSUM
);
...
@@ -1289,13 +1365,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
...
@@ -1289,13 +1365,11 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
goto
_err
;
}
}
pBlockCol
->
size
+=
n
;
}
}
// state
// state
offset
+=
pBlockCol
->
size
;
offset
=
offset
+
pBlockCol
->
bsize
+
pBlockCol
->
csize
+
sizeof
(
TSCKSUM
)
;
pSubBlock
->
bsize
+=
pBlockCol
->
size
;
pSubBlock
->
bsize
=
pSubBlock
->
bsize
+
pBlockCol
->
bsize
+
pBlockCol
->
csize
+
sizeof
(
TSCKSUM
)
;
}
}
code
=
tMapDataPutItem
(
&
pSubBlock
->
mBlockCol
,
pBlockCol
,
tPutBlockCol
);
code
=
tMapDataPutItem
(
&
pSubBlock
->
mBlockCol
,
pBlockCol
,
tPutBlockCol
);
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
ed6276bd
...
@@ -355,6 +355,7 @@ void tBlockReset(SBlock *pBlock) {
...
@@ -355,6 +355,7 @@ void tBlockReset(SBlock *pBlock) {
pBlock
->
aSubBlock
[
iSubBlock
].
nRow
=
0
;
pBlock
->
aSubBlock
[
iSubBlock
].
nRow
=
0
;
pBlock
->
aSubBlock
[
iSubBlock
].
cmprAlg
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
cmprAlg
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
offset
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
offset
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
vsize
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
ksize
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
ksize
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
bsize
=
-
1
;
pBlock
->
aSubBlock
[
iSubBlock
].
bsize
=
-
1
;
tMapDataReset
(
&
pBlock
->
aSubBlock
->
mBlockCol
);
tMapDataReset
(
&
pBlock
->
aSubBlock
->
mBlockCol
);
...
@@ -384,6 +385,7 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
...
@@ -384,6 +385,7 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
nRow
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
nRow
);
n
+=
tPutI8
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
cmprAlg
);
n
+=
tPutI8
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
cmprAlg
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
vsize
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
ksize
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
ksize
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
bsize
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlock
->
aSubBlock
[
iSubBlock
].
bsize
);
n
+=
tPutMapData
(
p
?
p
+
n
:
p
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
mBlockCol
);
n
+=
tPutMapData
(
p
?
p
+
n
:
p
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
mBlockCol
);
...
@@ -408,6 +410,7 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
...
@@ -408,6 +410,7 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
nRow
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
nRow
);
n
+=
tGetI8
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
cmprAlg
);
n
+=
tGetI8
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
cmprAlg
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
vsize
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
ksize
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
ksize
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
bsize
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
bsize
);
n
+=
tGetMapData
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
mBlockCol
);
n
+=
tGetMapData
(
p
+
n
,
&
pBlock
->
aSubBlock
[
iSubBlock
].
mBlockCol
);
...
@@ -443,7 +446,13 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
...
@@ -443,7 +446,13 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) {
if
(
pBlockCol
->
flag
!=
HAS_NULL
)
{
if
(
pBlockCol
->
flag
!=
HAS_NULL
)
{
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockCol
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockCol
->
offset
);
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockCol
->
size
);
if
(
pBlockCol
->
flag
!=
HAS_VALUE
)
{
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockCol
->
bsize
);
}
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockCol
->
csize
);
if
(
IS_VAR_DATA_TYPE
(
pBlockCol
->
type
))
{
n
+=
tPutI64v
(
p
?
p
+
n
:
p
,
pBlockCol
->
osize
);
}
}
}
return
n
;
return
n
;
...
@@ -461,7 +470,17 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
...
@@ -461,7 +470,17 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
if
(
pBlockCol
->
flag
!=
HAS_NULL
)
{
if
(
pBlockCol
->
flag
!=
HAS_NULL
)
{
n
+=
tGetI64v
(
p
+
n
,
&
pBlockCol
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlockCol
->
offset
);
n
+=
tGetI64v
(
p
+
n
,
&
pBlockCol
->
size
);
if
(
pBlockCol
->
flag
!=
HAS_VALUE
)
{
n
+=
tGetI64v
(
p
+
n
,
&
pBlockCol
->
bsize
);
}
else
{
pBlockCol
->
bsize
=
0
;
}
n
+=
tGetI64v
(
p
+
n
,
&
pBlockCol
->
csize
);
if
(
IS_VAR_DATA_TYPE
(
pBlockCol
->
type
))
{
n
+=
tGetI64v
(
p
+
n
,
&
pBlockCol
->
osize
);
}
else
{
pBlockCol
->
osize
=
-
1
;
}
}
}
return
n
;
return
n
;
...
@@ -921,6 +940,29 @@ _exit:
...
@@ -921,6 +940,29 @@ _exit:
return
code
;
return
code
;
}
}
int32_t
tColDataCopy
(
SColData
*
pColDataSrc
,
SColData
*
pColDataDest
)
{
int32_t
code
=
0
;
pColDataDest
->
cid
=
pColDataDest
->
cid
;
pColDataDest
->
type
=
pColDataDest
->
type
;
pColDataDest
->
offsetValid
=
0
;
pColDataDest
->
nVal
=
pColDataSrc
->
nVal
;
pColDataDest
->
flag
=
pColDataSrc
->
flag
;
if
(
pColDataSrc
->
flag
!=
HAS_NONE
&&
pColDataSrc
->
flag
!=
HAS_NULL
&&
pColDataSrc
->
flag
!=
HAS_VALUE
)
{
code
=
tsdbRealloc
(
&
pColDataDest
->
pBitMap
,
BIT2_SIZE
(
pColDataDest
->
nVal
));
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
pBitMap
,
pColDataSrc
->
pBitMap
,
BIT2_SIZE
(
pColDataSrc
->
nVal
));
}
pColDataDest
->
nData
=
pColDataSrc
->
nData
;
code
=
tsdbRealloc
(
&
pColDataDest
->
pData
,
pColDataSrc
->
nData
);
if
(
code
)
goto
_exit
;
memcpy
(
pColDataDest
->
pData
,
pColDataSrc
->
pData
,
pColDataSrc
->
nData
);
_exit:
return
code
;
}
static
int32_t
tColDataUpdateOffset
(
SColData
*
pColData
)
{
static
int32_t
tColDataUpdateOffset
(
SColData
*
pColData
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
SValue
value
;
SValue
value
;
...
@@ -1039,24 +1081,30 @@ void tBlockDataClear(SBlockData *pBlockData) {
...
@@ -1039,24 +1081,30 @@ void tBlockDataClear(SBlockData *pBlockData) {
taosArrayDestroyEx
(
pBlockData
->
aColData
,
tColDataClear
);
taosArrayDestroyEx
(
pBlockData
->
aColData
,
tColDataClear
);
}
}
static
SColData
*
tBlockDataAddBlockCol
(
SBlockData
*
pBlockData
,
int32_t
iColData
,
int16_t
cid
,
int8_t
type
)
{
int32_t
tBlockDataAddColData
(
SBlockData
*
pBlockData
,
int32_t
iColData
,
SColData
**
ppColData
)
{
int32_t
code
=
0
;
SColData
*
pColData
=
NULL
;
SColData
*
pColData
=
NULL
;
int32_t
idx
=
taosArrayGetSize
(
pBlockData
->
aColDataP
);
int32_t
idx
=
taosArrayGetSize
(
pBlockData
->
aColDataP
);
if
(
idx
>=
taosArrayGetSize
(
pBlockData
->
aColData
))
{
if
(
idx
>=
taosArrayGetSize
(
pBlockData
->
aColData
))
{
if
(
taosArrayPush
(
pBlockData
->
aColData
,
&
((
SColData
){
0
}))
==
NULL
)
return
NULL
;
if
(
taosArrayPush
(
pBlockData
->
aColData
,
&
((
SColData
){
0
}))
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
}
pColData
=
(
SColData
*
)
taosArrayGet
(
pBlockData
->
aColData
,
idx
);
pColData
=
(
SColData
*
)
taosArrayGet
(
pBlockData
->
aColData
,
idx
);
tColDataReset
(
pColData
,
cid
,
type
);
if
(
taosArrayInsert
(
pBlockData
->
aColDataP
,
iColData
,
&
pColData
)
==
NULL
)
return
NULL
;
// append NONE
if
(
taosArrayInsert
(
pBlockData
->
aColDataP
,
iColData
,
&
pColData
)
==
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pBlockData
->
nRow
;
i
++
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
if
(
tColDataAppendValue
(
pColData
,
&
COL_VAL_NONE
(
cid
,
type
))
!=
0
)
return
NULL
;
goto
_err
;
}
}
return
pColData
;
*
ppColData
=
pColData
;
return
code
;
_err:
*
ppColData
=
NULL
;
return
code
;
}
}
int32_t
tBlockDataAppendRow
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
)
{
int32_t
tBlockDataAppendRow
(
SBlockData
*
pBlockData
,
TSDBROW
*
pRow
,
STSchema
*
pTSchema
)
{
...
@@ -1092,10 +1140,14 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
...
@@ -1092,10 +1140,14 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
code
=
tColDataAppendValue
(
pColData
,
&
(
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
)));
code
=
tColDataAppendValue
(
pColData
,
&
(
COL_VAL_NONE
(
pColData
->
cid
,
pColData
->
type
)));
if
(
code
)
goto
_err
;
if
(
code
)
goto
_err
;
}
else
{
}
else
{
pColData
=
tBlockDataAddBlockCol
(
pBlockData
,
iColData
,
pColVal
->
cid
,
pColVal
->
type
);
code
=
tBlockDataAddColData
(
pBlockData
,
iColData
,
&
pColData
);
if
(
pColData
==
NULL
)
{
if
(
code
)
goto
_err
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
// append a NONE
tColDataReset
(
pColData
,
pColVal
->
cid
,
pColVal
->
type
);
for
(
int32_t
iRow
=
0
;
iRow
<
pBlockData
->
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NONE
(
pColVal
->
cid
,
pColVal
->
type
));
if
(
code
)
goto
_err
;
}
}
code
=
tColDataAppendValue
(
pColData
,
pColVal
);
code
=
tColDataAppendValue
(
pColData
,
pColVal
);
...
@@ -1119,10 +1171,13 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
...
@@ -1119,10 +1171,13 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
}
}
while
(
pColVal
)
{
while
(
pColVal
)
{
pColData
=
tBlockDataAddBlockCol
(
pBlockData
,
iColData
,
pColVal
->
cid
,
pColVal
->
type
);
code
=
tBlockDataAddColData
(
pBlockData
,
iColData
,
&
pColData
);
if
(
pColData
==
NULL
)
{
if
(
code
)
goto
_err
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
tColDataReset
(
pColData
,
pColVal
->
cid
,
pColVal
->
type
);
for
(
int32_t
iRow
=
0
;
iRow
<
pBlockData
->
nRow
;
iRow
++
)
{
code
=
tColDataAppendValue
(
pColData
,
&
COL_VAL_NONE
(
pColVal
->
cid
,
pColVal
->
type
));
if
(
code
)
goto
_err
;
}
}
code
=
tColDataAppendValue
(
pColData
,
pColVal
);
code
=
tColDataAppendValue
(
pColData
,
pColVal
);
...
@@ -1138,3 +1193,85 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
...
@@ -1138,3 +1193,85 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
_err:
_err:
return
code
;
return
code
;
}
}
int32_t
tBlockDataMerge
(
SBlockData
*
pBlockData1
,
SBlockData
*
pBlockData2
,
SBlockData
*
pBlockData
)
{
int32_t
code
=
0
;
tBlockDataReset
(
pBlockData
);
// loop to merge
int32_t
iRow1
=
0
;
int32_t
nRow1
=
pBlockData1
->
nRow
;
int32_t
iRow2
=
0
;
int32_t
nRow2
=
pBlockData2
->
nRow
;
TSDBROW
row1
;
TSDBROW
row2
;
int32_t
c
;
while
(
iRow1
<
nRow1
&&
iRow2
<
nRow2
)
{
row1
=
tsdbRowFromBlockData
(
pBlockData1
,
iRow1
);
row2
=
tsdbRowFromBlockData
(
pBlockData2
,
iRow2
);
c
=
tsdbKeyCmprFn
(
&
TSDBROW_KEY
(
&
row1
),
&
TSDBROW_KEY
(
&
row2
));
if
(
c
<
0
)
{
code
=
tBlockDataAppendRow
(
pBlockData
,
&
row1
,
NULL
);
if
(
code
)
goto
_exit
;
iRow1
++
;
}
else
if
(
c
>
0
)
{
code
=
tBlockDataAppendRow
(
pBlockData
,
&
row2
,
NULL
);
if
(
code
)
goto
_exit
;
iRow2
++
;
}
else
{
ASSERT
(
0
);
}
}
while
(
iRow1
<
nRow1
)
{
row1
=
tsdbRowFromBlockData
(
pBlockData1
,
iRow1
);
code
=
tBlockDataAppendRow
(
pBlockData
,
&
row1
,
NULL
);
if
(
code
)
goto
_exit
;
iRow1
++
;
}
while
(
iRow2
<
nRow2
)
{
row2
=
tsdbRowFromBlockData
(
pBlockData2
,
iRow2
);
code
=
tBlockDataAppendRow
(
pBlockData
,
&
row2
,
NULL
);
if
(
code
)
goto
_exit
;
iRow2
++
;
}
_exit:
return
code
;
}
int32_t
tBlockDataCopy
(
SBlockData
*
pBlockDataSrc
,
SBlockData
*
pBlockDataDest
)
{
int32_t
code
=
0
;
SColData
*
pColDataSrc
;
SColData
*
pColDataDest
;
ASSERT
(
pBlockDataSrc
->
nRow
>
0
);
tBlockDataReset
(
pBlockDataDest
);
pBlockDataDest
->
nRow
=
pBlockDataSrc
->
nRow
;
// TSDBKEY
code
=
tsdbRealloc
((
uint8_t
**
)
&
pBlockDataDest
->
aVersion
,
sizeof
(
int64_t
)
*
pBlockDataSrc
->
nRow
);
if
(
code
)
goto
_exit
;
code
=
tsdbRealloc
((
uint8_t
**
)
&
pBlockDataDest
->
aTSKEY
,
sizeof
(
TSKEY
)
*
pBlockDataSrc
->
nRow
);
if
(
code
)
goto
_exit
;
memcpy
(
pBlockDataDest
->
aVersion
,
pBlockDataSrc
->
aVersion
,
sizeof
(
int64_t
)
*
pBlockDataSrc
->
nRow
);
memcpy
(
pBlockDataDest
->
aTSKEY
,
pBlockDataSrc
->
aTSKEY
,
sizeof
(
TSKEY
)
*
pBlockDataSrc
->
nRow
);
// other
for
(
size_t
iColData
=
0
;
iColData
<
taosArrayGetSize
(
pBlockDataSrc
->
aColDataP
);
iColData
++
)
{
pColDataSrc
=
(
SColData
*
)
taosArrayGetP
(
pBlockDataSrc
->
aColDataP
,
iColData
);
code
=
tBlockDataAddColData
(
pBlockDataDest
,
iColData
,
&
pColDataDest
);
if
(
code
)
goto
_exit
;
code
=
tColDataCopy
(
pColDataSrc
,
pColDataDest
);
if
(
code
)
goto
_exit
;
}
_exit:
return
code
;
}
\ No newline at end of file
source/libs/executor/src/executil.c
浏览文件 @
ed6276bd
...
@@ -620,8 +620,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
...
@@ -620,8 +620,6 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
}
}
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
)
{
int32_t
initQueryTableDataCond
(
SQueryTableDataCond
*
pCond
,
const
STableScanPhysiNode
*
pTableScanNode
)
{
pCond
->
loadExternalRows
=
false
;
pCond
->
order
=
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pCond
->
order
=
pTableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pCond
->
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
pCond
->
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
...
@@ -647,15 +645,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
...
@@ -647,15 +645,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
}
}
#endif
#endif
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfTWindows
;
++
i
)
{
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
if
((
pCond
->
order
==
TSDB_ORDER_ASC
&&
pCond
->
twindows
[
i
].
skey
>
pCond
->
twindows
[
i
].
ekey
)
||
(
pCond
->
order
==
TSDB_ORDER_DESC
&&
pCond
->
twindows
[
i
].
skey
<
pCond
->
twindows
[
i
].
ekey
))
{
TSWAP
(
pCond
->
twindows
[
i
].
skey
,
pCond
->
twindows
[
i
].
ekey
);
}
}
taosqsort
(
pCond
->
twindows
,
pCond
->
numOfTWindows
,
sizeof
(
STimeWindow
),
pCond
,
compareTimeWindow
);
pCond
->
type
=
BLOCK_LOAD_OFFSET_SEQ_ORDER
;
// pCond->type = pTableScanNode->scanFlag;
// pCond->type = pTableScanNode->scanFlag;
int32_t
j
=
0
;
int32_t
j
=
0
;
...
@@ -677,6 +667,5 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
...
@@ -677,6 +667,5 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
}
}
void
cleanupQueryTableDataCond
(
SQueryTableDataCond
*
pCond
)
{
void
cleanupQueryTableDataCond
(
SQueryTableDataCond
*
pCond
)
{
taosMemoryFree
(
pCond
->
twindows
);
taosMemoryFree
(
pCond
->
colList
);
taosMemoryFree
(
pCond
->
colList
);
}
}
\ No newline at end of file
source/libs/executor/src/executorimpl.c
浏览文件 @
ed6276bd
...
@@ -4306,7 +4306,11 @@ STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle
...
@@ -4306,7 +4306,11 @@ STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle
}
}
STsdbReader
*
pReader
;
STsdbReader
*
pReader
;
tsdbReaderOpen
(
pHandle
->
vnode
,
&
cond
,
pTableListInfo
,
queryId
,
taskId
,
&
pReader
);
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
&
cond
,
pTableListInfo
,
queryId
,
taskId
,
&
pReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
cleanupQueryTableDataCond
(
&
cond
);
cleanupQueryTableDataCond
(
&
cond
);
return
pReader
;
return
pReader
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录