Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
0d8cc797
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0d8cc797
编写于
3月 25, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-34
上级
492ff11f
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
32 addition
and
57 deletion
+32
-57
src/common/inc/dataformat.h
src/common/inc/dataformat.h
+2
-1
src/common/src/dataformat.c
src/common/src/dataformat.c
+11
-6
src/vnode/tsdb/src/tsdbMain.c
src/vnode/tsdb/src/tsdbMain.c
+19
-50
未找到文件。
src/common/inc/dataformat.h
浏览文件 @
0d8cc797
...
@@ -108,6 +108,7 @@ typedef struct SDataCol {
...
@@ -108,6 +108,7 @@ typedef struct SDataCol {
int8_t
type
;
int8_t
type
;
int
bytes
;
int
bytes
;
int
len
;
int
len
;
int
offset
;
void
*
pData
;
void
*
pData
;
}
SDataCol
;
}
SDataCol
;
...
@@ -129,7 +130,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
...
@@ -129,7 +130,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void
tdResetDataCols
(
SDataCols
*
pCols
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
void
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
void
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
void
tdFreeDataCols
(
SDataCols
*
pCols
);
void
tdFreeDataCols
(
SDataCols
*
pCols
);
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
SDataCols
*
pCols
,
STSchema
*
pSchema
);
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
SDataCols
*
pCols
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/common/src/dataformat.c
浏览文件 @
0d8cc797
...
@@ -317,8 +317,13 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
...
@@ -317,8 +317,13 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
pCols
->
numOfCols
=
schemaNCols
(
pSchema
);
pCols
->
numOfCols
=
schemaNCols
(
pSchema
);
pCols
->
cols
[
0
].
pData
=
pCols
->
buf
;
pCols
->
cols
[
0
].
pData
=
pCols
->
buf
;
for
(
int
i
=
1
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
for
(
int
i
=
0
;
i
<
schemaNCols
(
pSchema
);
i
++
)
{
pCols
->
cols
[
i
].
pData
=
(
char
*
)(
pCols
->
cols
[
i
-
1
].
pData
)
+
schemaColAt
(
pSchema
,
i
-
1
)
->
bytes
*
pCols
->
maxPoints
;
if
(
i
>
0
)
{
pCols
->
cols
[
i
].
pData
=
(
char
*
)(
pCols
->
cols
[
i
-
1
].
pData
)
+
schemaColAt
(
pSchema
,
i
-
1
)
->
bytes
*
pCols
->
maxPoints
;
}
pCols
->
cols
[
i
].
type
=
colType
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
bytes
=
colBytes
(
schemaColAt
(
pSchema
,
i
));
pCols
->
cols
[
i
].
offset
=
colOffset
(
schemaColAt
(
pSchema
,
i
));
}
}
return
pCols
;
return
pCols
;
...
@@ -338,14 +343,14 @@ void tdResetDataCols(SDataCols *pCols) {
...
@@ -338,14 +343,14 @@ void tdResetDataCols(SDataCols *pCols) {
}
}
}
}
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
SDataCols
*
pCols
,
STSchema
*
pSchema
)
{
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
SDataCols
*
pCols
)
{
TSKEY
key
=
dataRowKey
(
row
);
TSKEY
key
=
dataRowKey
(
row
);
for
(
int
i
=
0
;
i
<
pCols
->
numOfCols
;
i
++
)
{
for
(
int
i
=
0
;
i
<
pCols
->
numOfCols
;
i
++
)
{
SDataCol
*
pCol
=
pCols
->
cols
+
i
;
SDataCol
*
pCol
=
pCols
->
cols
+
i
;
memcpy
((
void
*
)((
char
*
)(
pCol
->
pData
)
+
pCol
->
len
),
dataRowAt
(
row
,
colOffset
(
schemaColAt
(
pSchema
,
i
))),
memcpy
((
void
*
)((
char
*
)(
pCol
->
pData
)
+
pCol
->
len
),
dataRowAt
(
row
,
pCol
->
offset
),
pCol
->
bytes
);
colBytes
(
schemaColAt
(
pSchema
,
i
)));
pCol
->
len
+=
pCol
->
bytes
;
pCol
->
len
+=
colBytes
(
schemaColAt
(
pSchema
,
i
));
}
}
pCols
->
numOfPoints
++
;
}
}
/**
/**
...
...
src/vnode/tsdb/src/tsdbMain.c
浏览文件 @
0d8cc797
...
@@ -85,6 +85,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
...
@@ -85,6 +85,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbRestoreCfg
(
STsdbRepo
*
pRepo
,
STsdbCfg
*
pCfg
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
int32_t
tsdbGetDataDirName
(
STsdbRepo
*
pRepo
,
char
*
fname
);
static
void
*
tsdbCommitData
(
void
*
arg
);
static
void
*
tsdbCommitData
(
void
*
arg
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SDataCols
*
pCols
);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
...
@@ -761,7 +762,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
...
@@ -761,7 +762,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
return
0
;
return
0
;
}
}
static
int
tsdbReadRowsFromCache
(
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCol
**
cols
,
STSchema
*
pSchema
)
{
static
int
tsdbReadRowsFromCache
(
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCol
s
*
pCols
)
{
int
numOfRows
=
0
;
int
numOfRows
=
0
;
do
{
do
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
...
@@ -769,12 +770,8 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
...
@@ -769,12 +770,8 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
SDataRow
row
=
SL_GET_NODE_DATA
(
node
);
if
(
dataRowKey
(
row
)
>
maxKey
)
break
;
if
(
dataRowKey
(
row
)
>
maxKey
)
break
;
// Convert row data to column data
// for (int i = 0; i < schemaNCols(pSchema); i++) {
tdAppendDataRowToDataCol
(
row
,
pCols
);
// STColumn *pCol = schemaColAt(pSchema, i);
// memcpy(cols[i]->data + TYPE_BYTES[colType(pCol)] * numOfRows, dataRowAt(row, pCol->offset),
// TYPE_BYTES[colType(pCol)]);
// }
numOfRows
++
;
numOfRows
++
;
if
(
numOfRows
>
maxRowsToRead
)
break
;
if
(
numOfRows
>
maxRowsToRead
)
break
;
...
@@ -823,7 +820,7 @@ static void *tsdbCommitData(void *arg) {
...
@@ -823,7 +820,7 @@ static void *tsdbCommitData(void *arg) {
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbCache
*
pCache
=
pRepo
->
tsdbCache
;
STsdbCache
*
pCache
=
pRepo
->
tsdbCache
;
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
if
(
pCache
->
imem
==
NULL
)
return
;
if
(
pCache
->
imem
==
NULL
)
return
NULL
;
// Create the iterator to read from cache
// Create the iterator to read from cache
SSkipListIterator
**
iters
=
tsdbCreateTableIters
(
pMeta
,
pCfg
->
maxTables
);
SSkipListIterator
**
iters
=
tsdbCreateTableIters
(
pMeta
,
pCfg
->
maxTables
);
...
@@ -832,52 +829,23 @@ static void *tsdbCommitData(void *arg) {
...
@@ -832,52 +829,23 @@ static void *tsdbCommitData(void *arg) {
return
NULL
;
return
NULL
;
}
}
int
maxCols
=
pMeta
->
maxCols
;
// Create a data column buffer for commit
int
maxBytes
=
pMeta
->
maxRowBytes
;
SDataCols
*
pCols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pCfg
->
maxRowsPerFileBlock
);
SDataCol
**
cols
=
(
SDataCol
**
)
malloc
(
sizeof
(
SDataCol
*
)
*
maxCols
);
if
(
pCols
==
NULL
)
{
void
*
buf
=
malloc
((
maxBytes
+
sizeof
(
SDataCol
))
*
pCfg
->
maxRowsPerFileBlock
);
// TODO: deal with the error
return
NULL
;
}
int
sfid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
sfid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
efid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
int
efid
=
tsdbGetKeyFileId
(
pCache
->
imem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
);
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbCommitToFile
(
pRepo
,
fid
,
iters
,
pCols
);
tsdbGetKeyRangeOfFileId
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// tsdbOpenFileForWrite(pRepo, fid);
for
(
int
tid
=
0
;
tid
<
pCfg
->
maxTables
;
tid
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
tid
];
if
(
pTable
==
NULL
||
pTable
->
imem
==
NULL
)
continue
;
if
(
iters
[
tid
]
==
NULL
)
{
// create table iterator
iters
[
tid
]
=
tSkipListCreateIter
(
pTable
->
imem
->
pData
);
// TODO: deal with the error
if
(
iters
[
tid
]
==
NULL
)
break
;
if
(
!
tSkipListIterNext
(
iters
[
tid
]))
{
// assert(0);
}
}
// Init row data part
cols
[
0
]
=
(
SDataCol
*
)
buf
;
for
(
int
col
=
1
;
col
<
schemaNCols
(
pTable
->
schema
);
col
++
)
{
cols
[
col
]
=
(
SDataCol
*
)((
char
*
)(
cols
[
col
-
1
])
+
sizeof
(
SDataCol
)
+
colBytes
(
schemaColAt
(
pTable
->
schema
,
col
-
1
))
*
pCfg
->
maxRowsPerFileBlock
);
}
// Loop the iterator
int
rowsRead
=
0
;
while
((
rowsRead
=
tsdbReadRowsFromCache
(
iters
[
tid
],
maxKey
,
pCfg
->
maxRowsPerFileBlock
,
cols
,
pTable
->
schema
))
>
0
)
{
// printf("rowsRead:%d-----------\n", rowsRead);
int
k
=
0
;
}
}
}
}
tdFreeDataCols
(
pCols
);
tsdbDestroyTableIters
(
iters
,
pCfg
->
maxTables
);
tsdbDestroyTableIters
(
iters
,
pCfg
->
maxTables
);
free
(
buf
);
free
(
cols
);
tsdbLockRepo
(
arg
);
tsdbLockRepo
(
arg
);
tdListMove
(
pCache
->
imem
->
list
,
pCache
->
pool
.
memPool
);
tdListMove
(
pCache
->
imem
->
list
,
pCache
->
pool
.
memPool
);
...
@@ -896,7 +864,7 @@ static void *tsdbCommitData(void *arg) {
...
@@ -896,7 +864,7 @@ static void *tsdbCommitData(void *arg) {
return
NULL
;
return
NULL
;
}
}
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
SSkipListIterator
**
iters
,
int
fid
)
{
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SSkipListIterator
**
iters
,
SDataCols
*
pCols
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
...
@@ -908,11 +876,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, SSkipListIterator **iters, int fid
...
@@ -908,11 +876,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, SSkipListIterator **iters, int fid
SSkipListIterator
*
pIter
=
iters
[
tid
];
SSkipListIterator
*
pIter
=
iters
[
tid
];
if
(
pIter
==
NULL
)
continue
;
if
(
pIter
==
NULL
)
continue
;
tdInitDataCols
(
pCols
,
pTable
->
schema
);
// Read data
while
(
tsdbReadRowsFromCache
(
pIter
,
maxKey
,
pCfg
->
maxRowsPerFileBlock
,
pCols
))
{
// while () {
// TODO
int
k
=
0
;
//
}
}
}
}
return
0
;
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录