Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
爱划水de鲸鱼哥~
TDengine
提交
bbf36777
T
TDengine
项目概览
爱划水de鲸鱼哥~
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
bbf36777
编写于
9月 27, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1548
上级
153288bd
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
518 addition
and
353 deletion
+518
-353
src/common/inc/tdataformat.h
src/common/inc/tdataformat.h
+55
-19
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+43
-26
src/inc/taosdef.h
src/inc/taosdef.h
+1
-0
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+19
-3
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+1
-1
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+144
-54
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+6
-5
src/tsdb/src/tsdbRWHelper.c
src/tsdb/src/tsdbRWHelper.c
+219
-166
src/util/inc/tskiplist.h
src/util/inc/tskiplist.h
+7
-15
src/util/src/tskiplist.c
src/util/src/tskiplist.c
+23
-64
未找到文件。
src/common/inc/tdataformat.h
浏览文件 @
bbf36777
...
...
@@ -119,6 +119,33 @@ void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
int
tdAddColToSchema
(
STSchemaBuilder
*
pBuilder
,
int8_t
type
,
int16_t
colId
,
int16_t
bytes
);
STSchema
*
tdGetSchemaFromBuilder
(
STSchemaBuilder
*
pBuilder
);
// ----------------- Semantic timestamp key definition
typedef
uint64_t
TKEY
;
#define TKEY_INVALID UINT64_MAX
#define TKEY_NULL TKEY_INVALID
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << (sizeof(TKEY) * 8 - 1))
#define TKEY_DELETE_FLAG (((TKEY)1) << (sizeof(TKEY) * 8 - 2))
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG))
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0)
#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG)
#define tdGetTKEY(key) (((TKEY)ABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key)))
#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1))
static
FORCE_INLINE
int
tkeyComparFn
(
const
void
*
tkey1
,
const
void
*
tkey2
)
{
TSKEY
key1
=
tdGetKey
(
*
(
TKEY
*
)
tkey1
);
TSKEY
key2
=
tdGetKey
(
*
(
TKEY
*
)
tkey2
);
if
(
key1
<
key2
)
{
return
-
1
;
}
else
if
(
key1
>
key2
)
{
return
1
;
}
else
{
return
0
;
}
}
// ----------------- Data row structure
/* A data row, the format is like below:
...
...
@@ -129,6 +156,8 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder);
* +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
typedef
void
*
SDataRow
;
...
...
@@ -137,11 +166,13 @@ typedef void *SDataRow;
#define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
#define dataRowKey(r) tdGetKey(dataRowTKey(r))
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
#define dataRowSetVersion(r, v) (dataRowVersion(r) = (v))
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r))
SDataRow
tdNewDataRowFromSchema
(
STSchema
*
pSchema
);
void
tdFreeDataRow
(
SDataRow
row
);
...
...
@@ -154,16 +185,18 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
int32_t
toffset
=
offset
+
TD_DATA_ROW_HEAD_SIZE
;
char
*
ptr
=
(
char
*
)
POINTER_SHIFT
(
row
,
dataRowLen
(
row
));
switch
(
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
*
(
VarDataOffsetT
*
)
POINTER_SHIFT
(
row
,
toffset
)
=
dataRowLen
(
row
);
memcpy
(
ptr
,
value
,
varDataTLen
(
value
));
dataRowLen
(
row
)
+=
varDataTLen
(
value
);
break
;
default:
if
(
IS_VAR_DATA_TYPE
(
type
))
{
*
(
VarDataOffsetT
*
)
POINTER_SHIFT
(
row
,
toffset
)
=
dataRowLen
(
row
);
memcpy
(
ptr
,
value
,
varDataTLen
(
value
));
dataRowLen
(
row
)
+=
varDataTLen
(
value
);
}
else
{
if
(
offset
==
0
)
{
ASSERT
(
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
TKEY
tvalue
=
tdGetTKEY
(
*
(
TSKEY
*
)
value
);
memcpy
(
POINTER_SHIFT
(
row
,
toffset
),
&
tvalue
,
TYPE_BYTES
[
type
]);
}
else
{
memcpy
(
POINTER_SHIFT
(
row
,
toffset
),
value
,
TYPE_BYTES
[
type
]);
break
;
}
}
return
0
;
...
...
@@ -171,12 +204,10 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
// NOTE: offset here including the header size
static
FORCE_INLINE
void
*
tdGetRowDataOfCol
(
SDataRow
row
,
int8_t
type
,
int32_t
offset
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
return
POINTER_SHIFT
(
row
,
*
(
VarDataOffsetT
*
)
POINTER_SHIFT
(
row
,
offset
));
default:
return
POINTER_SHIFT
(
row
,
offset
);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
return
POINTER_SHIFT
(
row
,
*
(
VarDataOffsetT
*
)
POINTER_SHIFT
(
row
,
offset
));
}
else
{
return
POINTER_SHIFT
(
row
,
offset
);
}
}
...
...
@@ -243,9 +274,14 @@ typedef struct {
}
SDataCols
;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
#define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
#define dataColsTKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, 0)
#define dataColsKeyFirst(pCols) ((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, 0)
#define dataColsTKeyLast(pCols) \
(((pCols)->numOfRows == 0) ? TKEY_INVALID : dataColsTKeyAt(pCols, (pCols)->numOfRows - 1))
#define dataColsKeyLast(pCols) \
(((pCols)->numOfRows == 0) ? TSDB_DATA_BIGINT_NULL : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
SDataCols
*
tdNewDataCols
(
int
maxRowSize
,
int
maxCols
,
int
maxRows
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
...
...
src/common/src/tdataformat.c
浏览文件 @
bbf36777
...
...
@@ -423,30 +423,41 @@ void tdResetDataCols(SDataCols *pCols) {
}
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
STSchema
*
pSchema
,
SDataCols
*
pCols
)
{
ASSERT
(
dataColsKeyLast
(
pCols
)
<
dataRowKey
(
row
));
ASSERT
(
pCols
->
numOfRows
==
0
||
dataColsKeyLast
(
pCols
)
<
dataRowKey
(
row
));
int
rcol
=
0
;
int
dcol
=
0
;
while
(
dcol
<
pCols
->
numOfCols
)
{
SDataCol
*
pDataCol
=
&
(
pCols
->
cols
[
dcol
]);
if
(
rcol
>=
schemaNCols
(
pSchema
))
{
dataColSetNullAt
(
pDataCol
,
pCols
->
numOfRows
);
dcol
++
;
continue
;
if
(
dataRowDeleted
(
row
))
{
for
(;
dcol
<
pCols
->
numOfCols
;
dcol
++
)
{
SDataCol
*
pDataCol
=
&
(
pCols
->
cols
[
dcol
]);
if
(
dcol
==
0
)
{
dataColAppendVal
(
pDataCol
,
dataRowTuple
(
row
),
pCols
->
numOfRows
,
pCols
->
maxPoints
);
}
else
{
dataColSetNullAt
(
pDataCol
,
pCols
->
numOfRows
);
}
}
}
else
{
while
(
dcol
<
pCols
->
numOfCols
)
{
SDataCol
*
pDataCol
=
&
(
pCols
->
cols
[
dcol
]);
if
(
rcol
>=
schemaNCols
(
pSchema
))
{
dataColSetNullAt
(
pDataCol
,
pCols
->
numOfRows
);
dcol
++
;
continue
;
}
STColumn
*
pRowCol
=
schemaColAt
(
pSchema
,
rcol
);
if
(
pRowCol
->
colId
==
pDataCol
->
colId
)
{
void
*
value
=
tdGetRowDataOfCol
(
row
,
pRowCol
->
type
,
pRowCol
->
offset
+
TD_DATA_ROW_HEAD_SIZE
);
dataColAppendVal
(
pDataCol
,
value
,
pCols
->
numOfRows
,
pCols
->
maxPoints
);
dcol
++
;
rcol
++
;
}
else
if
(
pRowCol
->
colId
<
pDataCol
->
colId
)
{
rcol
++
;
}
else
{
dataColSetNullAt
(
pDataCol
,
pCols
->
numOfRows
);
dcol
++
;
STColumn
*
pRowCol
=
schemaColAt
(
pSchema
,
rcol
);
if
(
pRowCol
->
colId
==
pDataCol
->
colId
)
{
void
*
value
=
tdGetRowDataOfCol
(
row
,
pRowCol
->
type
,
pRowCol
->
offset
+
TD_DATA_ROW_HEAD_SIZE
);
dataColAppendVal
(
pDataCol
,
value
,
pCols
->
numOfRows
,
pCols
->
maxPoints
);
dcol
++
;
rcol
++
;
}
else
if
(
pRowCol
->
colId
<
pDataCol
->
colId
)
{
rcol
++
;
}
else
{
dataColSetNullAt
(
pDataCol
,
pCols
->
numOfRows
);
dcol
++
;
}
}
}
pCols
->
numOfRows
++
;
...
...
@@ -511,8 +522,12 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
while
(
target
->
numOfRows
<
tRows
)
{
if
(
*
iter1
>=
limit1
&&
*
iter2
>=
limit2
)
break
;
TSKEY
key1
=
(
*
iter1
>=
limit1
)
?
INT64_MAX
:
((
TSKEY
*
)(
src1
->
cols
[
0
].
pData
))[
*
iter1
];
TSKEY
key2
=
(
*
iter2
>=
limit2
)
?
INT64_MAX
:
((
TSKEY
*
)(
src2
->
cols
[
0
].
pData
))[
*
iter2
];
TSKEY
key1
=
(
*
iter1
>=
limit1
)
?
INT64_MAX
:
dataColsKeyAt
(
src1
,
*
iter1
);
TKEY
tkey1
=
(
*
iter1
>=
limit1
)
?
TKEY_NULL
:
dataColsTKeyAt
(
src1
,
*
iter1
);
TSKEY
key2
=
(
*
iter2
>=
limit2
)
?
INT64_MAX
:
dataColsKeyAt
(
src2
,
*
iter2
);
TKEY
tkey2
=
(
*
iter2
>=
limit2
)
?
TKEY_NULL
:
dataColsTKeyAt
(
src2
,
*
iter2
);
ASSERT
(
tkey1
==
TKEY_NULL
||
(
!
TKEY_IS_DELETED
(
tkey1
)));
if
(
key1
<
key2
)
{
for
(
int
i
=
0
;
i
<
src1
->
numOfCols
;
i
++
)
{
...
...
@@ -525,12 +540,14 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
target
->
numOfRows
++
;
(
*
iter1
)
++
;
}
else
{
for
(
int
i
=
0
;
i
<
src2
->
numOfCols
;
i
++
)
{
ASSERT
(
target
->
cols
[
i
].
type
==
src2
->
cols
[
i
].
type
);
if
(
src2
->
cols
[
i
].
len
>
0
)
{
dataColAppendVal
(
&
(
target
->
cols
[
i
]),
tdGetColDataOfRow
(
src2
->
cols
+
i
,
*
iter2
),
target
->
numOfRows
,
target
->
maxPoints
);
}
else
if
(
key1
>=
key2
)
{
if
((
key1
>
key2
)
||
(
key1
==
key2
&&
!
TKEY_IS_DELETED
(
tkey2
)))
{
for
(
int
i
=
0
;
i
<
src2
->
numOfCols
;
i
++
)
{
ASSERT
(
target
->
cols
[
i
].
type
==
src2
->
cols
[
i
].
type
);
if
(
src2
->
cols
[
i
].
len
>
0
)
{
dataColAppendVal
(
&
(
target
->
cols
[
i
]),
tdGetColDataOfRow
(
src2
->
cols
+
i
,
*
iter2
),
target
->
numOfRows
,
target
->
maxPoints
);
}
}
}
...
...
src/inc/taosdef.h
浏览文件 @
bbf36777
...
...
@@ -75,6 +75,7 @@ extern const int32_t TYPE_BYTES[11];
#define TSDB_DATA_SMALLINT_NULL 0x8000
#define TSDB_DATA_INT_NULL 0x80000000
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
#define TSDB_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define TSDB_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define TSDB_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
bbf36777
...
...
@@ -320,6 +320,15 @@ typedef struct {
void
*
compBuffer
;
// Buffer for temperary compress/decompress purpose
}
SRWHelper
;
typedef
struct
{
int
rowsInserted
;
int
rowsUpdated
;
int
rowsDeleteSucceed
;
int
rowsDeleteFailed
;
int
nOperations
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SMergeInfo
;
// ------------------ tsdbScan.c
typedef
struct
{
SFileGroup
fGroup
;
...
...
@@ -422,7 +431,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
SListNode
*
tsdbAllocBufBlockFromPool
(
STsdbRepo
*
pRepo
);
// ------------------ tsdbMemTable.c
int
tsdb
InsertRowTo
Mem
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
);
int
tsdb
UpdateRowIn
Mem
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
);
int
tsdbRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbUnRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemTable
**
pMem
,
SMemTable
**
pIMem
);
...
...
@@ -430,7 +439,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem)
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
);
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
T
SKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
);
T
KEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
static
FORCE_INLINE
SDataRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
...
...
@@ -443,11 +452,18 @@ static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
-
1
;
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
return
dataRowKey
(
row
);
}
static
FORCE_INLINE
TKEY
tsdbNextIterTKey
(
SSkipListIterator
*
pIter
)
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TKEY_NULL
;
return
dataRowTKey
(
row
);
}
static
FORCE_INLINE
STsdbBufBlock
*
tsdbGetCurrBufBlock
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
!=
NULL
);
if
(
pRepo
->
mem
==
NULL
)
return
NULL
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
bbf36777
...
...
@@ -765,7 +765,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
return
-
1
;
}
if
(
tsdb
InsertRowTo
Mem
(
pRepo
,
row
,
pTable
)
<
0
)
return
-
1
;
if
(
tsdb
UpdateRowIn
Mem
(
pRepo
,
row
,
pTable
)
<
0
)
return
-
1
;
(
*
affectedrows
)
++
;
points
++
;
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
bbf36777
...
...
@@ -32,14 +32,31 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
);
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SDataRow
row
);
// ---------------- INTERNAL FUNCTIONS ----------------
int
tsdb
InsertRowTo
Mem
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
)
{
int
tsdb
UpdateRowIn
Mem
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
TKEY
tkey
=
dataRowTKey
(
row
);
TSKEY
key
=
dataRowKey
(
row
);
SMemTable
*
pMemTable
=
pRepo
->
mem
;
STableData
*
pTableData
=
NULL
;
bool
isRowDelete
=
TKEY_IS_DELETED
(
tkey
);
if
(
isRowDelete
)
{
if
(
!
pCfg
->
update
)
{
tsdbWarn
(
"vgId:%d vnode is not allowed to update but try to delete a data row"
,
REPO_ID
(
pRepo
));
terrno
=
TSDB_CODE_TDB_INVALID_ACTION
;
return
-
1
;
}
if
(
key
>
TABLE_LASTKEY
(
pTable
))
{
tsdbTrace
(
"vgId:%d skip to delete row key %"
PRId64
" which is larger than table lastKey %"
PRId64
,
REPO_ID
(
pRepo
),
key
,
TABLE_LASTKEY
(
pTable
));
return
0
;
}
}
void
*
pRow
=
tsdbAllocBytes
(
pRepo
,
dataRowLen
(
row
));
if
(
pRow
==
NULL
)
{
...
...
@@ -88,8 +105,10 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
if
(
tSkipListPut
(
pTableData
->
pData
,
pRow
)
==
NULL
)
{
tsdbFreeBytes
(
pRepo
,
(
void
*
)
pRow
,
dataRowLen
(
row
));
}
else
{
// TODO: may need to refact here
int64_t
deltaSize
=
SL_SIZE
(
pTableData
->
pData
)
-
oldSize
;
if
(
TABLE_LASTKEY
(
pTable
)
<
key
)
TABLE_LASTKEY
(
pTable
)
=
key
;
if
((
!
isRowDelete
)
&&
(
TABLE_LASTKEY
(
pTable
)
<
key
))
TABLE_LASTKEY
(
pTable
)
=
key
;
if
(
pMemTable
->
keyFirst
>
key
)
pMemTable
->
keyFirst
=
key
;
if
(
pMemTable
->
keyLast
<
key
)
pMemTable
->
keyLast
=
key
;
pMemTable
->
numOfRows
+=
deltaSize
;
...
...
@@ -99,8 +118,9 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
pTableData
->
numOfRows
+=
deltaSize
;
}
tsdbTrace
(
"vgId:%d a row is inserted to table %s tid %d uid %"
PRIu64
" key %"
PRIu64
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
),
key
);
tsdbTrace
(
"vgId:%d a row is %s table %s tid %d uid %"
PRIu64
" key %"
PRIu64
,
REPO_ID
(
pRepo
),
isRowDelete
?
"deleted from"
:
"updated in"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
),
key
);
return
0
;
}
...
...
@@ -270,66 +290,120 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
return
0
;
}
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function try to move as mush as possible.
*/
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
T
SKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
T
KEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
&&
pMergeInfo
!=
NULL
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
int
numOfRows
=
0
;
TSKEY
keyNext
=
0
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
SDataRow
row
=
NULL
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
dataRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
}
else
{
rowKey
=
dataRowKey
(
row
);
isRowDel
=
dataRowDeleted
(
row
);
}
if
(
nFilterKeys
==
0
||
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
rowKey
);
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
if
(
nFilterKeys
!=
0
)
{
// for filter purpose
ASSERT
(
filterKeys
!=
NULL
);
keyNext
=
tsdbNextIterKey
(
pIter
);
if
(
keyNext
<
0
||
keyNext
>
maxKey
)
return
numOfRows
;
void
*
ptr
=
taosbsearch
((
void
*
)(
&
keyNext
),
(
void
*
)
filterKeys
,
nFilterKeys
,
sizeof
(
TSKEY
),
compTSKEY
,
TD_GE
);
filterIter
=
(
ptr
==
NULL
)
?
nFilterKeys
:
(
int
)((
POINTER_DISTANCE
(
ptr
,
filterKeys
)
/
sizeof
(
TSKEY
)));
}
do
{
SDataRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
break
;
keyNext
=
dataRowKey
(
row
);
if
(
keyNext
>
maxKey
)
break
;
bool
keyFiltered
=
false
;
if
(
nFilterKeys
!=
0
)
{
while
(
true
)
{
if
(
filterIter
>=
nFilterKeys
)
break
;
if
(
keyNext
==
filterKeys
[
filterIter
])
{
keyFiltered
=
true
;
filterIter
++
;
break
;
}
else
if
(
keyNext
<
filterKeys
[
filterIter
])
{
break
;
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
dataRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
}
else
{
rowKey
=
dataRowKey
(
row
);
isRowDel
=
dataRowDeleted
(
row
);
}
}
else
{
if
(
isRowDel
)
{
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
rowKey
);
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
else
{
filterIter
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
}
if
(
!
keyFiltered
)
{
if
(
numOfRows
>=
maxRowsToRead
)
break
;
numOfRows
++
;
}
if
(
!
keyFiltered
||
keepDup
)
{
if
(
pCols
)
{
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
dataRowVersion
(
row
)
)
{
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
dataRowVersion
(
row
))
;
if
(
pSchema
==
NULL
)
{
ASSERT
(
0
);
}
}
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
dataRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
}
else
{
rowKey
=
dataRowKey
(
row
);
isRowDel
=
dataRowDeleted
(
row
);
}
tdAppendDataRowToDataCol
(
row
,
pSchema
,
pCols
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
}
while
(
tSkipListIterNext
(
pIter
));
}
}
return
numOfRows
;
return
0
;
}
// ---------------- LOCAL FUNCTIONS ----------------
...
...
@@ -420,7 +494,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
pTableData
->
pData
=
tSkipListCreate
(
TSDB_DATA_SKIPLIST_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
pCfg
->
update
?
SL_UPDATE_DUP_KEY
:
SL_DISCARD_DUP_KEY
,
tsdbGetTsTupleKey
);
tkeyComparFn
,
pCfg
->
update
?
SL_UPDATE_DUP_KEY
:
SL_DISCARD_DUP_KEY
,
tsdbGetTsTupleKey
);
if
(
pTableData
->
pData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -562,7 +636,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo) {
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
TSKEY
nextKey
=
tsdbNextIterKey
((
iters
+
i
)
->
pIter
);
if
(
nextKey
>
0
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
1
;
if
(
nextKey
!=
TSDB_DATA_TIMESTAMP_NULL
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
1
;
}
return
0
;
}
...
...
@@ -758,5 +832,21 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
taosTFree
(
tData
);
return
0
;
}
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SDataRow
row
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
dataRowVersion
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
dataRowVersion
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendDataRowToDataCol
(
row
,
*
ppSchema
,
pCols
);
}
return
0
;
}
\ No newline at end of file
src/tsdb/src/tsdbMeta.c
浏览文件 @
bbf36777
...
...
@@ -86,7 +86,8 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
if
(
pTable
!=
NULL
)
{
tsdbError
(
"vgId:%d table %s already exists, tid %d uid %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
));
return
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
terrno
=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
goto
_err
;
}
if
(
pCfg
->
type
==
TSDB_CHILD_TABLE
)
{
...
...
@@ -700,7 +701,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) {
}
pTable
->
tagVal
=
NULL
;
STColumn
*
pCol
=
schemaColAt
(
pTable
->
tagSchema
,
DEFAULT_TAG_INDEX_COLUMN
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
(
uint8_t
)(
colBytes
(
pCol
)),
SL_ALLOW_DUP_KEY
,
getTagIndexKey
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
(
uint8_t
)(
colBytes
(
pCol
)),
NULL
,
SL_ALLOW_DUP_KEY
,
getTagIndexKey
);
if
(
pTable
->
pIndex
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -745,7 +746,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) {
T_REF_INC
(
pTable
);
tsdb
Trace
(
"table %s tid %d uid %"
PRIu64
" is created"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
tsdb
Debug
(
"table %s tid %d uid %"
PRIu64
" is created"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
));
return
pTable
;
...
...
@@ -1155,8 +1156,8 @@ static void *tsdbDecodeTable(void *buf, STable **pRTable) {
if
(
TABLE_TYPE
(
pTable
)
==
TSDB_SUPER_TABLE
)
{
buf
=
tdDecodeSchema
(
buf
,
&
(
pTable
->
tagSchema
));
STColumn
*
pCol
=
schemaColAt
(
pTable
->
tagSchema
,
DEFAULT_TAG_INDEX_COLUMN
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
(
uint8_t
)(
colBytes
(
pCol
)),
SL_ALLOW_DUP_KEY
,
getTagIndexKey
);
pTable
->
pIndex
=
tSkipListCreate
(
TSDB_SUPER_TABLE_SL_LEVEL
,
colType
(
pCol
),
(
uint8_t
)(
colBytes
(
pCol
)),
NULL
,
SL_ALLOW_DUP_KEY
,
getTagIndexKey
);
if
(
pTable
->
pIndex
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbFreeTable
(
pTable
);
...
...
src/tsdb/src/tsdbRWHelper.c
浏览文件 @
bbf36777
...
...
@@ -27,6 +27,7 @@
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
#define TSDB_KEY_COL_OFFSET 0
#define TSDB_GET_COMPBLOCK_IDX(h, b) (POINTER_DISTANCE(b, (h)->pCompInfo->blocks)/sizeof(SCompBlock))
#define TSDB_IS_LAST_BLOCK(pb) ((pb)->last)
static
bool
tsdbShouldCreateNewLast
(
SRWHelper
*
pHelper
);
static
int
tsdbWriteBlockToFile
(
SRWHelper
*
pHelper
,
SFile
*
pFile
,
SDataCols
*
pDataCols
,
SCompBlock
*
pCompBlock
,
...
...
@@ -34,7 +35,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pD
static
int
compareKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbAdjustInfoSizeIfNeeded
(
SRWHelper
*
pHelper
,
size_t
esize
);
static
int
tsdbInsertSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
);
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
int
rowsAdded
);
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
SMergeInfo
*
pMergeInfo
);
static
int
tsdbUpdateSuperBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
);
static
void
tsdbResetHelperFileImpl
(
SRWHelper
*
pHelper
);
static
int
tsdbInitHelperFile
(
SRWHelper
*
pHelper
);
...
...
@@ -61,8 +62,10 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pComp
static
int
tsdbWriteBlockToProperFile
(
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
,
SCompBlock
*
pCompBlock
);
static
int
tsdbProcessMergeCommit
(
SRWHelper
*
pHelper
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pDataCols
,
TSKEY
maxKey
,
int
*
blkIdx
);
static
int
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
static
void
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
static
bool
tsdbCheckAddSubBlockCond
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SMergeInfo
*
pMergeInfo
,
int
maxOps
);
static
int
tsdbDeleteSuperBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
);
// ---------------------- INTERNAL FUNCTIONS ----------------------
int
tsdbInitReadHelper
(
SRWHelper
*
pHelper
,
STsdbRepo
*
pRepo
)
{
...
...
@@ -279,7 +282,7 @@ int tsdbCommitTableData(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols
while
(
true
)
{
ASSERT
(
blkIdx
<=
(
int
)
pIdx
->
numOfBlocks
);
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
if
(
keyFirst
<
0
||
keyFirst
>
maxKey
)
break
;
// iter over
if
(
keyFirst
==
TSDB_DATA_TIMESTAMP_NULL
||
keyFirst
>
maxKey
)
break
;
// iter over
if
(
pIdx
->
len
<=
0
||
keyFirst
>
pIdx
->
maxKey
)
{
if
(
tsdbProcessAppendCommit
(
pHelper
,
pCommitIter
,
pDataCols
,
maxKey
)
<
0
)
return
-
1
;
...
...
@@ -925,7 +928,7 @@ _err:
return
-
1
;
}
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
int
rowsAdded
)
{
static
int
tsdbAddSubBlock
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
int
blkIdx
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
pCompBlock
->
numOfSubBlocks
==
0
);
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
...
...
@@ -958,9 +961,9 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
pSCompBlock
->
numOfSubBlocks
++
;
ASSERT
(
pSCompBlock
->
numOfSubBlocks
<=
TSDB_MAX_SUBBLOCKS
);
pSCompBlock
->
len
+=
sizeof
(
SCompBlock
);
pSCompBlock
->
numOfRows
+=
rowsAdd
ed
;
pSCompBlock
->
keyFirst
=
MIN
(
pSCompBlock
->
keyFirst
,
pCompBlock
->
keyFirst
)
;
pSCompBlock
->
keyLast
=
MAX
(
pSCompBlock
->
keyLast
,
pCompBlock
->
keyLast
)
;
pSCompBlock
->
numOfRows
=
pSCompBlock
->
numOfRows
+
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucce
ed
;
pSCompBlock
->
keyFirst
=
pMergeInfo
->
keyFirst
;
pSCompBlock
->
keyLast
=
pMergeInfo
->
keyLast
;
pIdx
->
len
+=
sizeof
(
SCompBlock
);
}
else
{
// Need to create two sub-blocks
void
*
ptr
=
NULL
;
...
...
@@ -989,11 +992,11 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
((
SCompBlock
*
)
ptr
)[
1
]
=
*
pCompBlock
;
pSCompBlock
->
numOfSubBlocks
=
2
;
pSCompBlock
->
numOfRows
+=
rowsAdd
ed
;
pSCompBlock
->
numOfRows
=
pSCompBlock
->
numOfRows
+
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucce
ed
;
pSCompBlock
->
offset
=
((
char
*
)
ptr
)
-
((
char
*
)
pHelper
->
pCompInfo
);
pSCompBlock
->
len
=
sizeof
(
SCompBlock
)
*
2
;
pSCompBlock
->
keyFirst
=
MIN
(((
SCompBlock
*
)
ptr
)[
0
].
keyFirst
,
((
SCompBlock
*
)
ptr
)[
1
].
keyFirst
)
;
pSCompBlock
->
keyLast
=
MAX
(((
SCompBlock
*
)
ptr
)[
0
].
keyLast
,
((
SCompBlock
*
)
ptr
)[
1
].
keyLast
)
;
pSCompBlock
->
keyFirst
=
pMergeInfo
->
keyFirst
;
pSCompBlock
->
keyLast
=
pMergeInfo
->
keyLast
;
pIdx
->
len
+=
(
sizeof
(
SCompBlock
)
*
2
);
}
...
...
@@ -1047,6 +1050,45 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
return
0
;
}
static
int
tsdbDeleteSuperBlock
(
SRWHelper
*
pHelper
,
int
blkIdx
)
{
SCompIdx
*
pCompIdx
=
&
(
pHelper
->
curCompIdx
);
ASSERT
(
pCompIdx
->
numOfBlocks
>
0
&&
blkIdx
<
pCompIdx
->
numOfBlocks
);
SCompBlock
*
pCompBlock
=
blockAtIdx
(
pHelper
,
blkIdx
);
SCompBlock
compBlock
=
*
pCompBlock
;
ASSERT
(
pCompBlock
->
numOfSubBlocks
>
0
&&
pCompBlock
->
numOfSubBlocks
<=
TSDB_MAX_SUBBLOCKS
);
if
(
pCompIdx
->
numOfBlocks
==
1
)
{
memset
(
pCompIdx
,
0
,
sizeof
(
*
pCompIdx
));
}
else
{
int
tsize
=
0
;
if
(
compBlock
.
numOfSubBlocks
>
1
)
{
tsize
=
pCompIdx
->
len
-
(
compBlock
.
offset
+
sizeof
(
SCompBlock
)
*
compBlock
.
numOfSubBlocks
);
ASSERT
(
tsize
>
0
);
memmove
(
POINTER_SHIFT
(
pHelper
->
pCompInfo
,
compBlock
.
offset
),
POINTER_SHIFT
(
pHelper
->
pCompInfo
,
compBlock
.
offset
+
sizeof
(
SCompBlock
)
*
compBlock
.
numOfSubBlocks
),
tsize
);
pCompIdx
->
len
=
pCompIdx
->
len
-
sizeof
(
SCompBlock
)
*
compBlock
.
numOfSubBlocks
;
}
tsize
=
pCompIdx
->
len
-
POINTER_DISTANCE
(
blockAtIdx
(
pHelper
,
blkIdx
+
1
),
pHelper
->
pCompInfo
);
ASSERT
(
tsize
>
0
);
memmove
((
void
*
)
blockAtIdx
(
pHelper
,
blkIdx
),
(
void
*
)
blockAtIdx
(
pHelper
,
blkIdx
+
1
),
tsize
);
pCompIdx
->
len
-=
sizeof
(
SCompBlock
);
pCompIdx
->
numOfBlocks
--
;
pCompIdx
->
hasLast
=
blockAtIdx
(
pHelper
,
pCompIdx
->
numOfBlocks
-
1
)
->
last
;
pCompIdx
->
maxKey
=
blockAtIdx
(
pHelper
,
pCompIdx
->
numOfBlocks
-
1
)
->
keyLast
;
}
return
0
;
}
static
void
tsdbResetHelperFileImpl
(
SRWHelper
*
pHelper
)
{
pHelper
->
idxH
.
numOfIdx
=
0
;
pHelper
->
idxH
.
curIdx
=
0
;
...
...
@@ -1439,12 +1481,14 @@ static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) {
}
static
int
tsdbProcessAppendCommit
(
SRWHelper
*
pHelper
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pDataCols
,
TSKEY
maxKey
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STable
*
pTable
=
pCommitIter
->
pTable
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
int
defaultRowsInBlock
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
SCompBlock
compBlock
=
{
0
};
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STable
*
pTable
=
pCommitIter
->
pTable
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
int
defaultRowsInBlock
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
SCompBlock
compBlock
=
{
0
};
SMergeInfo
mergeInfo
=
{
0
};
SMergeInfo
*
pMergeInfo
=
&
mergeInfo
;
ASSERT
(
pIdx
->
len
<=
0
||
keyFirst
>
pIdx
->
maxKey
);
if
(
pIdx
->
hasLast
)
{
// append to with last block
...
...
@@ -1452,39 +1496,47 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
SCompBlock
*
pCompBlock
=
blockAtIdx
(
pHelper
,
pIdx
->
numOfBlocks
-
1
);
ASSERT
(
pCompBlock
->
last
&&
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
);
tdResetDataCols
(
pDataCols
);
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
defaultRowsInBlock
-
pCompBlock
->
numOfRows
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
);
ASSERT
(
rowsRead
>
0
&&
rowsRead
==
pDataCols
->
numOfRows
);
if
(
rowsRead
+
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
pCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
&&
!
TSDB_NLAST_FILE_OPENED
(
pHelper
))
{
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperLastF
(
pHelper
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
-
1
,
rowsRead
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
==
pCompBlock
->
numOfRows
);
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
defaultRowsInBlock
-
pCompBlock
->
numOfRows
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
,
pMergeInfo
);
if
(
tdMergeDataCols
(
pHelper
->
pDataCols
[
0
],
pDataCols
,
pDataCols
->
numOfRows
)
<
0
)
return
-
1
;
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
==
pCompBlock
->
numOfRows
+
pDataCols
->
numOfRows
);
ASSERT
(
pMergeInfo
->
rowsInserted
==
pMergeInfo
->
nOperations
&&
pMergeInfo
->
nOperations
==
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToProperFile
(
pHelper
,
pHelper
->
pDataCols
[
0
],
&
compBlock
)
<
0
)
return
-
1
;
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
-
1
)
<
0
)
return
-
1
;
}
if
(
pDataCols
->
numOfRows
>
0
)
{
ASSERT
((
pMergeInfo
->
keyFirst
==
dataColsKeyFirst
(
pDataCols
))
&&
(
pMergeInfo
->
keyLast
==
dataColsKeyLast
(
pDataCols
)));
if
(
pHelper
->
hasOldLastBlock
)
pHelper
->
hasOldLastBlock
=
false
;
if
(
pDataCols
->
numOfRows
+
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
pCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
&&
!
TSDB_NLAST_FILE_OPENED
(
pHelper
))
{
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperLastF
(
pHelper
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
-
1
,
pMergeInfo
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
==
pCompBlock
->
numOfRows
);
if
(
tdMergeDataCols
(
pHelper
->
pDataCols
[
0
],
pDataCols
,
pDataCols
->
numOfRows
)
<
0
)
return
-
1
;
ASSERT
(
pHelper
->
pDataCols
[
0
]
->
numOfRows
==
pCompBlock
->
numOfRows
+
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToProperFile
(
pHelper
,
pHelper
->
pDataCols
[
0
],
&
compBlock
)
<
0
)
return
-
1
;
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
-
1
)
<
0
)
return
-
1
;
}
if
(
pHelper
->
hasOldLastBlock
)
pHelper
->
hasOldLastBlock
=
false
;
}
}
else
{
ASSERT
(
!
pHelper
->
hasOldLastBlock
);
tdResetDataCols
(
pDataCols
);
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
defaultRowsInBlock
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
);
ASSERT
(
rowsRead
>
0
&&
rowsRead
==
pDataCols
->
numOfRows
);
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
defaultRowsInBlock
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
,
pMergeInfo
);
ASSERT
(
pMergeInfo
->
rowsInserted
==
pMergeInfo
->
nOperations
&&
pMergeInfo
->
nOperations
==
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToProperFile
(
pHelper
,
pDataCols
,
&
compBlock
)
<
0
)
return
-
1
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
)
<
0
)
return
-
1
;
if
(
pDataCols
->
numOfRows
>
0
)
{
ASSERT
((
pMergeInfo
->
keyFirst
==
dataColsKeyFirst
(
pDataCols
))
&&
(
pMergeInfo
->
keyLast
==
dataColsKeyLast
(
pDataCols
)));
if
(
tsdbWriteBlockToProperFile
(
pHelper
,
pDataCols
,
&
compBlock
)
<
0
)
return
-
1
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
pIdx
->
numOfBlocks
)
<
0
)
return
-
1
;
}
}
#ifndef NDEBUG
TSKEY
keyNext
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
ASSERT
(
keyNext
<
0
||
keyNext
>
pIdx
->
maxKey
);
ASSERT
(
keyNext
==
TSDB_DATA_TIMESTAMP_NULL
||
keyNext
>
pIdx
->
maxKey
);
#endif
return
0
;
...
...
@@ -1492,13 +1544,15 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
static
int
tsdbProcessMergeCommit
(
SRWHelper
*
pHelper
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pDataCols
,
TSKEY
maxKey
,
int
*
blkIdx
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STable
*
pTable
=
pCommitIter
->
pTable
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
SCompBlock
compBlock
=
{
0
};
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
int
defaultRowsInBlock
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
SDataCols
*
pDataCols0
=
pHelper
->
pDataCols
[
0
];
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
STable
*
pTable
=
pCommitIter
->
pTable
;
SCompIdx
*
pIdx
=
&
(
pHelper
->
curCompIdx
);
SCompBlock
compBlock
=
{
0
};
TSKEY
keyFirst
=
tsdbNextIterKey
(
pCommitIter
->
pIter
);
int
defaultRowsInBlock
=
pCfg
->
maxRowsPerFileBlock
*
4
/
5
;
SDataCols
*
pDataCols0
=
pHelper
->
pDataCols
[
0
];
SMergeInfo
mergeInfo
=
{
0
};
SMergeInfo
*
pMergeInfo
=
&
mergeInfo
;
SSkipListIterator
slIter
=
{
0
};
...
...
@@ -1509,120 +1563,82 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
ASSERT
(
pCompBlock
!=
NULL
);
int
tblkIdx
=
(
int32_t
)(
TSDB_GET_COMPBLOCK_IDX
(
pHelper
,
pCompBlock
));
if
(
pCompBlock
->
last
)
{
ASSERT
(
pCompBlock
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
&&
tblkIdx
==
pIdx
->
numOfBlocks
-
1
);
ASSERT
((
!
TSDB_IS_LAST_BLOCK
(
pCompBlock
))
||
(
tblkIdx
==
pIdx
->
numOfBlocks
-
1
));
if
((
!
TSDB_IS_LAST_BLOCK
(
pCompBlock
))
&&
keyFirst
<
pCompBlock
->
keyFirst
)
{
// Loop to write data until pCompBlock->keyFirst-1
while
(
true
)
{
tdResetDataCols
(
pDataCols
);
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
pCompBlock
->
keyLast
-
1
,
defaultRowsInBlock
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
,
pMergeInfo
);
ASSERT
(
pMergeInfo
->
rowsInserted
==
pMergeInfo
->
nOperations
&&
pMergeInfo
->
nOperations
==
pDataCols
->
numOfRows
);
if
(
pDataCols
->
numOfRows
==
0
)
break
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
tblkIdx
++
;
}
ASSERT
(
tblkIdx
==
0
||
(
tsdbNextIterKey
(
pCommitIter
->
pIter
)
==
TSDB_DATA_TIMESTAMP_NULL
||
tsdbNextIterKey
(
pCommitIter
->
pIter
)
>
blockAtIdx
(
pHelper
,
tblkIdx
-
1
)
->
keyLast
));
}
else
{
int16_t
colId
=
0
;
slIter
=
*
(
pCommitIter
->
pIter
);
if
(
tsdbLoadBlockDataCols
(
pHelper
,
pCompBlock
,
NULL
,
&
colId
,
1
)
<
0
)
return
-
1
;
ASSERT
(
pDataCols0
->
numOfRows
==
pCompBlock
->
numOfRows
);
int
rows1
=
defaultRowsInBlock
-
pCompBlock
->
numOfRows
;
int
rows2
=
tsdbLoadDataFromCache
(
pTable
,
&
slIter
,
maxKey
,
rows1
,
NULL
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
,
pCfg
->
update
);
if
(
!
pCfg
->
update
&&
rows2
==
0
)
{
// all data filtered out
TSKEY
keyLimit
=
(
tblkIdx
==
pIdx
->
numOfBlocks
-
1
)
?
maxKey
:
(
blockAtIdx
(
pHelper
,
tblkIdx
+
1
)
->
keyFirst
-
1
);
slIter
=
*
(
pCommitIter
->
pIter
);
tsdbLoadDataFromCache
(
pTable
,
&
slIter
,
keyLimit
,
INT_MAX
,
NULL
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
,
pCfg
->
update
,
pMergeInfo
);
if
(
pMergeInfo
->
nOperations
==
0
)
{
// Do nothing
ASSERT
(
pMergeInfo
->
rowsDeleteFailed
>
0
);
*
(
pCommitIter
->
pIter
)
=
slIter
;
tblkIdx
++
;
}
else
if
(
pCompBlock
->
numOfRows
+
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
==
0
)
{
// Delete the block and do some stuff
ASSERT
(
pMergeInfo
->
keyFirst
==
INT64_MAX
&&
pMergeInfo
->
keyFirst
==
INT64_MIN
);
if
(
tsdbDeleteSuperBlock
(
pHelper
,
tblkIdx
)
<
0
)
return
-
1
;
*
pCommitIter
->
pIter
=
slIter
;
if
(
pCompBlock
->
last
&&
pHelper
->
hasOldLastBlock
)
pHelper
->
hasOldLastBlock
=
false
;
}
else
if
(
tsdbCheckAddSubBlockCond
(
pHelper
,
pCompBlock
,
pMergeInfo
,
pDataCols
->
maxPoints
))
{
// Append as a sub-block of the searched block
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
keyLimit
,
INT_MAX
,
pDataCols
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
,
pCfg
->
update
,
pMergeInfo
);
ASSERT
(
memcmp
(
pCommitIter
->
pIter
,
&
slIter
,
sizeof
(
slIter
))
==
0
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
pCompBlock
->
last
?
helperLastF
(
pHelper
)
:
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
pCompBlock
->
last
,
false
)
<
0
)
{
return
-
1
;
}
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
tblkIdx
,
pMergeInfo
)
<
0
)
{
return
-
1
;
}
tblkIdx
++
;
}
else
{
if
(
pCompBlock
->
numOfRows
+
rows2
<
pCfg
->
minRowsPerFileBlock
&&
pCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
&&
!
TSDB_NLAST_FILE_OPENED
(
pHelper
))
{
tdResetDataCols
(
pDataCols
);
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
maxKey
,
rows1
,
pDataCols
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
,
pCfg
->
update
);
ASSERT
(
rowsRead
==
rows2
&&
rowsRead
<=
pDataCols
->
numOfRows
&&
pDataCols
->
numOfRows
>
0
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperLastF
(
pHelper
),
pDataCols
,
&
compBlock
,
true
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
tblkIdx
,
rowsRead
)
<
0
)
return
-
1
;
tblkIdx
++
;
}
else
{
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
int
round
=
0
;
int
dIter
=
0
;
while
(
true
)
{
int
rowsRead
=
tsdbLoadAndMergeFromCache
(
pDataCols0
,
&
dIter
,
pCommitIter
,
pDataCols
,
maxKey
,
defaultRowsInBlock
,
pCfg
->
update
);
if
(
rowsRead
==
0
)
break
;
// load the block data, merge with the memory data
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
int
round
=
0
;
int
dIter
=
0
;
while
(
true
)
{
tsdbLoadAndMergeFromCache
(
pDataCols0
,
&
dIter
,
pCommitIter
,
pDataCols
,
keyLimit
,
defaultRowsInBlock
,
pCfg
->
update
);
if
(
pDataCols
->
numOfRows
==
0
)
break
;
if
(
tblkIdx
==
pIdx
->
numOfBlocks
-
1
)
{
if
(
tsdbWriteBlockToProperFile
(
pHelper
,
pDataCols
,
&
compBlock
)
<
0
)
return
-
1
;
if
(
round
==
0
)
{
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
}
tblkIdx
++
;
round
++
;
}
else
{
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
}
}
if
(
pHelper
->
hasOldLastBlock
)
pHelper
->
hasOldLastBlock
=
false
;
}
}
else
{
TSKEY
keyLimit
=
(
tblkIdx
==
pIdx
->
numOfBlocks
-
1
)
?
maxKey
:
(
pCompBlock
[
1
].
keyFirst
-
1
);
TSKEY
blkKeyFirst
=
pCompBlock
->
keyFirst
;
TSKEY
blkKeyLast
=
pCompBlock
->
keyLast
;
if
(
keyFirst
<
blkKeyFirst
)
{
while
(
true
)
{
tdResetDataCols
(
pDataCols
);
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
blkKeyFirst
-
1
,
defaultRowsInBlock
,
pDataCols
,
NULL
,
0
,
pCfg
->
update
);
if
(
rowsRead
==
0
)
break
;
ASSERT
(
rowsRead
==
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
tblkIdx
++
;
}
ASSERT
(
tblkIdx
==
0
||
(
tsdbNextIterKey
(
pCommitIter
->
pIter
)
<
0
||
tsdbNextIterKey
(
pCommitIter
->
pIter
)
>
blockAtIdx
(
pHelper
,
tblkIdx
-
1
)
->
keyLast
));
}
else
{
ASSERT
(
keyFirst
<=
blkKeyLast
);
int16_t
colId
=
0
;
if
(
tsdbLoadBlockDataCols
(
pHelper
,
pCompBlock
,
NULL
,
&
colId
,
1
)
<
0
)
return
-
1
;
slIter
=
*
(
pCommitIter
->
pIter
);
int
rows1
=
(
pCfg
->
maxRowsPerFileBlock
-
pCompBlock
->
numOfRows
);
int
rows2
=
tsdbLoadDataFromCache
(
pTable
,
&
slIter
,
blkKeyLast
,
INT_MAX
,
NULL
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
,
pCfg
->
update
);
if
(
!
pCfg
->
update
&&
rows2
==
0
)
{
// all filtered out
*
(
pCommitIter
->
pIter
)
=
slIter
;
ASSERT
(
tblkIdx
==
0
||
(
tsdbNextIterKey
(
pCommitIter
->
pIter
)
<
0
||
tsdbNextIterKey
(
pCommitIter
->
pIter
)
>
blockAtIdx
(
pHelper
,
tblkIdx
-
1
)
->
keyLast
));
}
else
{
int
rows3
=
tsdbLoadDataFromCache
(
pTable
,
&
slIter
,
keyLimit
,
INT_MAX
,
NULL
,
NULL
,
0
,
pCfg
->
update
)
+
rows2
;
if
(
pCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
&&
rows1
>=
rows2
)
{
int
rows
=
(
rows1
>=
rows3
)
?
rows3
:
rows2
;
tdResetDataCols
(
pDataCols
);
int
rowsRead
=
tsdbLoadDataFromCache
(
pTable
,
pCommitIter
->
pIter
,
keyLimit
,
rows
,
pDataCols
,
pDataCols0
->
cols
[
0
].
pData
,
pDataCols0
->
numOfRows
,
pCfg
->
update
);
ASSERT
(
rowsRead
==
rows
&&
rowsRead
<=
pDataCols
->
numOfRows
);
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
false
)
<
0
)
return
-
1
;
if
(
tsdbAddSubBlock
(
pHelper
,
&
compBlock
,
tblkIdx
,
rowsRead
)
<
0
)
return
-
1
;
tblkIdx
++
;
ASSERT
(
tblkIdx
==
0
||
(
tsdbNextIterKey
(
pCommitIter
->
pIter
)
<
0
||
tsdbNextIterKey
(
pCommitIter
->
pIter
)
>
blockAtIdx
(
pHelper
,
tblkIdx
-
1
)
->
keyLast
));
if
(
round
==
0
)
{
if
(
pCompBlock
->
last
&&
pHelper
->
hasOldLastBlock
)
pHelper
->
hasOldLastBlock
=
false
;
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbLoadBlockData
(
pHelper
,
pCompBlock
,
NULL
)
<
0
)
return
-
1
;
int
round
=
0
;
int
dIter
=
0
;
while
(
true
)
{
int
rowsRead
=
tsdbLoadAndMergeFromCache
(
pDataCols0
,
&
dIter
,
pCommitIter
,
pDataCols
,
keyLimit
,
defaultRowsInBlock
,
pCfg
->
update
);
if
(
rowsRead
==
0
)
break
;
if
(
tsdbWriteBlockToFile
(
pHelper
,
helperDataF
(
pHelper
),
pDataCols
,
&
compBlock
,
false
,
true
)
<
0
)
return
-
1
;
if
(
round
==
0
)
{
if
(
tsdbUpdateSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
}
else
{
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
}
round
++
;
tblkIdx
++
;
}
ASSERT
(
tblkIdx
==
0
||
(
tsdbNextIterKey
(
pCommitIter
->
pIter
)
<
0
||
tsdbNextIterKey
(
pCommitIter
->
pIter
)
>
blockAtIdx
(
pHelper
,
tblkIdx
-
1
)
->
keyLast
));
if
(
tsdbInsertSuperBlock
(
pHelper
,
&
compBlock
,
tblkIdx
)
<
0
)
return
-
1
;
}
round
++
;
tblkIdx
++
;
}
}
}
...
...
@@ -1631,9 +1647,8 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
return
0
;
}
static
int
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
static
void
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
)
{
int
numOfRows
=
0
;
TSKEY
key1
=
INT64_MAX
;
TSKEY
key2
=
INT64_MAX
;
STSchema
*
pSchema
=
NULL
;
...
...
@@ -1643,39 +1658,60 @@ static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIte
while
(
true
)
{
key1
=
(
*
iter
>=
pDataCols
->
numOfRows
)
?
INT64_MAX
:
dataColsKeyAt
(
pDataCols
,
*
iter
);
bool
isRowDel
=
false
;
SDataRow
row
=
tsdbNextIterRow
(
pCommitIter
->
pIter
);
key2
=
(
row
==
NULL
||
dataRowKey
(
row
)
>
maxKey
)
?
INT64_MAX
:
dataRowKey
(
row
);
if
(
row
==
NULL
||
dataRowKey
(
row
)
>
maxKey
)
{
key2
=
INT64_MAX
;
}
else
{
key2
=
dataRowKey
(
row
);
isRowDel
=
dataRowDeleted
(
row
);
}
if
(
key1
==
INT64_MAX
&&
key2
==
INT64_MAX
)
break
;
if
(
(
key1
<
key2
)
||
((
!
update
)
&&
(
key1
==
key2
))
)
{
if
(
key1
<
key2
)
{
for
(
int
i
=
0
;
i
<
pDataCols
->
numOfCols
;
i
++
)
{
dataColAppendVal
(
pTarget
->
cols
+
i
,
tdGetColDataOfRow
(
pDataCols
->
cols
+
i
,
*
iter
),
pTarget
->
numOfRows
,
pTarget
->
maxPoints
);
}
pTarget
->
numOfRows
++
;
(
*
iter
)
++
;
}
else
if
(
key1
>
key2
)
{
if
(
!
isRowDel
)
{
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
dataRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
dataRowVersion
(
row
));
ASSERT
(
pSchema
!=
NULL
);
}
if
((
!
update
)
&&
(
key1
==
key2
))
{
tSkipListIterNext
(
pCommitIter
->
pIter
);
tdAppendDataRowToDataCol
(
row
,
pSchema
,
pTarget
);
}
tSkipListIterNext
(
pCommitIter
->
pIter
);
}
else
{
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
dataRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
dataRowVersion
(
row
));
ASSERT
(
pSchema
!=
NULL
);
}
if
(
update
)
{
if
(
!
isRowDel
)
{
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
dataRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
dataRowVersion
(
row
));
ASSERT
(
pSchema
!=
NULL
);
}
tdAppendDataRowToDataCol
(
row
,
pSchema
,
pTarget
);
tdAppendDataRowToDataCol
(
row
,
pSchema
,
pTarget
);
}
}
else
{
ASSERT
(
!
isRowDel
);
for
(
int
i
=
0
;
i
<
pDataCols
->
numOfCols
;
i
++
)
{
dataColAppendVal
(
pTarget
->
cols
+
i
,
tdGetColDataOfRow
(
pDataCols
->
cols
+
i
,
*
iter
),
pTarget
->
numOfRows
,
pTarget
->
maxPoints
);
}
pTarget
->
numOfRows
++
;
}
(
*
iter
)
++
;
tSkipListIterNext
(
pCommitIter
->
pIter
);
if
(
key1
==
key2
)
(
*
iter
)
++
;
}
numOfRows
++
;
if
(
numOfRows
>=
maxRows
)
break
;
ASSERT
(
numOfRows
==
pTarget
->
numOfRows
&&
numOfRows
<=
pTarget
->
maxPoints
);
}
return
numOfRows
;
}
static
int
tsdbWriteBlockToProperFile
(
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
,
SCompBlock
*
pCompBlock
)
{
...
...
@@ -1698,3 +1734,20 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols,
return
0
;
}
static
bool
tsdbCheckAddSubBlockCond
(
SRWHelper
*
pHelper
,
SCompBlock
*
pCompBlock
,
SMergeInfo
*
pMergeInfo
,
int
maxOps
)
{
STsdbCfg
*
pCfg
=
&
(
pHelper
->
pRepo
->
config
);
int
mergeRows
=
pCompBlock
->
numOfRows
+
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
;
ASSERT
(
mergeRows
>
0
);
if
(
pCompBlock
->
numOfSubBlocks
<
TSDB_MAX_SUBBLOCKS
&&
pMergeInfo
->
nOperations
<=
maxOps
)
{
if
(
pCompBlock
->
last
)
{
if
(
!
TSDB_NLAST_FILE_OPENED
(
pHelper
)
&&
mergeRows
<
pCfg
->
minRowsPerFileBlock
)
return
true
;
}
else
{
if
(
mergeRows
<
pCfg
->
maxRowsPerFileBlock
)
return
true
;
}
}
return
false
;
}
\ No newline at end of file
src/util/inc/tskiplist.h
浏览文件 @
bbf36777
...
...
@@ -39,16 +39,11 @@ typedef char *(*__sl_key_fn_t)(const void *);
typedef
struct
SSkipListNode
{
uint8_t
level
;
uint8_t
flags
;
void
*
pData
;
struct
SSkipListNode
*
forwards
[];
}
SSkipListNode
;
#define SL_NODE_DELETED_FLAG (uint8_t)0x1
#define SL_GET_NODE_DATA(n) (n)->pData
#define SL_IS_NODE_DELETED(n) ((n)->flags & SL_NODE_DELETED_FLAG)
#define SL_SET_NODE_DELETED(n) (n)->flags |= SL_NODE_DELETED_FLAG
#define SL_NODE_GET_FORWARD_POINTER(n, l) (n)->forwards[(l)]
#define SL_NODE_GET_BACKWARD_POINTER(n, l) (n)->forwards[(n)->level + (l)]
...
...
@@ -109,8 +104,7 @@ typedef struct SSkipList {
uint8_t
flags
;
uint8_t
type
;
// static info above
uint8_t
level
;
uint32_t
size
;
// semantic meaning of size
uint32_t
tsize
;
// # of all skiplist nodes in this SL
uint32_t
size
;
SSkipListNode
*
pHead
;
// point to the first element
SSkipListNode
*
pTail
;
// point to the last element
#if SKIP_LIST_RECORD_PERFORMANCE
...
...
@@ -131,13 +125,13 @@ typedef struct SSkipListIterator {
#define SL_GET_MIN_KEY(s) SL_GET_NODE_KEY(s, SL_NODE_GET_FORWARD_POINTER((s)->pHead, 0))
#define SL_GET_MAX_KEY(s) SL_GET_NODE_KEY((s), SL_NODE_GET_BACKWARD_POINTER((s)->pTail, 0))
#define SL_SIZE(s) (s)->size
#define SL_TSIZE(s) (s)->tsize
SSkipList
*
tSkipListCreate
(
uint8_t
nMaxLevel
,
uint8_t
keyType
,
uint16_t
keyLen
,
uint8_t
flags
,
__sl_key_fn_t
fn
);
void
tSkipListDestroy
(
SSkipList
*
pSkipList
);
SSkipListNode
*
tSkipListPut
(
SSkipList
*
pSkipList
,
void
*
pData
);
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
);
void
tSkipListPrint
(
SSkipList
*
pSkipList
,
int16_t
nlevel
);
SSkipList
*
tSkipListCreate
(
uint8_t
maxLevel
,
uint8_t
keyType
,
uint16_t
keyLen
,
__compar_fn_t
comparFn
,
uint8_t
flags
,
__sl_key_fn_t
fn
);
void
tSkipListDestroy
(
SSkipList
*
pSkipList
);
SSkipListNode
*
tSkipListPut
(
SSkipList
*
pSkipList
,
void
*
pData
);
SArray
*
tSkipListGet
(
SSkipList
*
pSkipList
,
SSkipListKey
pKey
);
void
tSkipListPrint
(
SSkipList
*
pSkipList
,
int16_t
nlevel
);
SSkipListIterator
*
tSkipListCreateIter
(
SSkipList
*
pSkipList
);
SSkipListIterator
*
tSkipListCreateIterFromVal
(
SSkipList
*
pSkipList
,
const
char
*
val
,
int32_t
type
,
int32_t
order
);
bool
tSkipListIterNext
(
SSkipListIterator
*
iter
);
...
...
@@ -145,8 +139,6 @@ SSkipListNode * tSkipListIterGet(SSkipListIterator *iter);
void
*
tSkipListDestroyIter
(
SSkipListIterator
*
iter
);
uint32_t
tSkipListRemove
(
SSkipList
*
pSkipList
,
SSkipListKey
key
);
void
tSkipListRemoveNode
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
);
SSkipListKey
tSkipListGetMinKey
(
SSkipList
*
pSkipList
);
SSkipListKey
tSkipListGetMaxKey
(
SSkipList
*
pSkipList
);
#ifdef __cplusplus
}
...
...
src/util/src/tskiplist.c
浏览文件 @
bbf36777
...
...
@@ -34,7 +34,8 @@ static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
static
FORCE_INLINE
int
tSkipListUnlock
(
SSkipList
*
pSkipList
);
static
FORCE_INLINE
int32_t
getSkipListRandLevel
(
SSkipList
*
pSkipList
);
SSkipList
*
tSkipListCreate
(
uint8_t
maxLevel
,
uint8_t
keyType
,
uint16_t
keyLen
,
uint8_t
flags
,
__sl_key_fn_t
fn
)
{
SSkipList
*
tSkipListCreate
(
uint8_t
maxLevel
,
uint8_t
keyType
,
uint16_t
keyLen
,
__compar_fn_t
comparFn
,
uint8_t
flags
,
__sl_key_fn_t
fn
)
{
SSkipList
*
pSkipList
=
(
SSkipList
*
)
calloc
(
1
,
sizeof
(
SSkipList
));
if
(
pSkipList
==
NULL
)
return
NULL
;
...
...
@@ -47,7 +48,11 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, u
pSkipList
->
len
=
keyLen
;
pSkipList
->
flags
=
flags
;
pSkipList
->
keyFn
=
fn
;
pSkipList
->
comparFn
=
getKeyComparFunc
(
keyType
);
if
(
comparFn
==
NULL
)
{
pSkipList
->
comparFn
=
getKeyComparFunc
(
keyType
);
}
else
{
pSkipList
->
comparFn
=
comparFn
;
}
if
(
initForwardBackwardPtr
(
pSkipList
)
<
0
)
{
tSkipListDestroy
(
pSkipList
);
...
...
@@ -115,10 +120,6 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
if
(
dupMode
==
SL_UPDATE_DUP_KEY
)
{
pNode
=
SL_NODE_GET_FORWARD_POINTER
(
forward
[
0
],
0
);
atomic_store_ptr
(
&
(
pNode
->
pData
),
pData
);
if
(
SL_IS_NODE_DELETED
(
pNode
))
{
pNode
->
flags
&=
(
~
(
SL_NODE_DELETED_FLAG
));
pSkipList
->
size
++
;
}
}
}
else
{
pNode
=
tSkipListNewNode
(
getSkipListRandLevel
(
pSkipList
));
...
...
@@ -137,8 +138,6 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
uint32_t
tSkipListRemove
(
SSkipList
*
pSkipList
,
SSkipListKey
key
)
{
uint32_t
count
=
0
;
if
(
SL_DUP_MODE
(
pSkipList
)
==
SL_DISCARD_DUP_KEY
)
return
0
;
tSkipListWLock
(
pSkipList
);
SSkipListNode
*
pNode
=
getPriorNode
(
pSkipList
,
key
,
TSDB_ORDER_ASC
);
...
...
@@ -177,9 +176,7 @@ SArray *tSkipListGet(SSkipList *pSkipList, SSkipListKey key) {
if
(
pSkipList
->
comparFn
(
key
,
SL_GET_NODE_KEY
(
pSkipList
,
p
))
!=
0
)
{
break
;
}
if
(
!
SL_IS_NODE_DELETED
(
p
))
{
taosArrayPush
(
sa
,
&
p
);
}
taosArrayPush
(
sa
,
&
p
);
pNode
=
p
;
}
...
...
@@ -227,19 +224,13 @@ bool tSkipListIterNext(SSkipListIterator *iter) {
tSkipListRLock
(
pSkipList
);
if
(
iter
->
order
==
TSDB_ORDER_ASC
)
{
while
(
true
)
{
iter
->
cur
=
SL_NODE_GET_FORWARD_POINTER
(
iter
->
cur
,
0
);
iter
->
step
++
;
if
(
iter
->
cur
==
pSkipList
->
pTail
)
break
;
if
(
!
SL_IS_NODE_DELETED
(
iter
->
cur
))
break
;
}
if
(
iter
->
cur
==
pSkipList
->
pTail
)
return
false
;
iter
->
cur
=
SL_NODE_GET_FORWARD_POINTER
(
iter
->
cur
,
0
);
iter
->
step
++
;
}
else
{
while
(
true
)
{
iter
->
cur
=
SL_NODE_GET_BACKWARD_POINTER
(
iter
->
cur
,
0
);
iter
->
step
++
;
if
(
iter
->
cur
==
pSkipList
->
pHead
)
break
;
if
(
!
SL_IS_NODE_DELETED
(
iter
->
cur
))
break
;
}
if
(
iter
->
cur
==
pSkipList
->
pHead
)
return
false
;
iter
->
cur
=
SL_NODE_GET_BACKWARD_POINTER
(
iter
->
cur
,
0
);
iter
->
step
++
;
}
tSkipListUnlock
(
pSkipList
);
...
...
@@ -305,28 +296,6 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
}
}
SSkipListKey
tSkipListGetMinKey
(
SSkipList
*
pSkipList
)
{
if
(
pSkipList
==
NULL
||
SL_SIZE
(
pSkipList
)
==
0
)
return
NULL
;
SSkipListNode
*
pNode
=
pSkipList
->
pHead
;
while
((
pNode
=
SL_NODE_GET_FORWARD_POINTER
(
pNode
,
0
))
!=
pSkipList
->
pTail
)
{
if
(
!
SL_IS_NODE_DELETED
(
pNode
))
return
pSkipList
->
keyFn
(
pNode
->
pData
);
}
return
NULL
;
}
SSkipListKey
tSkipListGetMaxKey
(
SSkipList
*
pSkipList
)
{
if
(
pSkipList
==
NULL
||
SL_SIZE
(
pSkipList
)
==
0
)
return
NULL
;
SSkipListNode
*
pNode
=
pSkipList
->
pTail
;
while
((
pNode
=
SL_NODE_GET_BACKWARD_POINTER
(
pNode
,
0
))
!=
pSkipList
->
pHead
)
{
if
(
!
SL_IS_NODE_DELETED
(
pNode
))
return
pSkipList
->
keyFn
(
pNode
->
pData
);
}
return
NULL
;
}
static
void
tSkipListDoInsert
(
SSkipList
*
pSkipList
,
SSkipListNode
**
forward
,
SSkipListNode
*
pNode
)
{
for
(
int32_t
i
=
0
;
i
<
pNode
->
level
;
++
i
)
{
if
(
i
>=
pSkipList
->
level
)
{
...
...
@@ -349,7 +318,6 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
if
(
pSkipList
->
level
<
pNode
->
level
)
pSkipList
->
level
=
pNode
->
level
;
pSkipList
->
size
+=
1
;
pSkipList
->
tsize
+=
1
;
}
static
SSkipListIterator
*
doCreateSkipListIterator
(
SSkipList
*
pSkipList
,
int32_t
order
)
{
...
...
@@ -447,27 +415,18 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **forward,
static
void
tSkipListRemoveNodeImpl
(
SSkipList
*
pSkipList
,
SSkipListNode
*
pNode
)
{
int32_t
level
=
pNode
->
level
;
uint8_t
dupMode
=
SL_DUP_MODE
(
pSkipList
);
ASSERT
(
dupMode
!=
SL_DISCARD_DUP_KEY
&&
dupMode
!=
SL_UPDATE_DUP_KEY
);
if
(
dupMode
==
SL_UPDATE_DUP_KEY
)
{
if
(
SL_IS_NODE_DELETED
(
pNode
))
{
return
;
}
else
{
SL_SET_NODE_DELETED
(
pNode
);
pSkipList
->
size
--
;
}
}
else
{
for
(
int32_t
j
=
level
-
1
;
j
>=
0
;
--
j
)
{
SSkipListNode
*
prev
=
SL_NODE_GET_BACKWARD_POINTER
(
pNode
,
j
);
SSkipListNode
*
next
=
SL_NODE_GET_FORWARD_POINTER
(
pNode
,
j
);
for
(
int32_t
j
=
level
-
1
;
j
>=
0
;
--
j
)
{
SSkipListNode
*
prev
=
SL_NODE_GET_BACKWARD_POINTER
(
pNode
,
j
);
SSkipListNode
*
next
=
SL_NODE_GET_FORWARD_POINTER
(
pNode
,
j
);
SL_NODE_GET_FORWARD_POINTER
(
prev
,
j
)
=
next
;
SL_NODE_GET_BACKWARD_POINTER
(
next
,
j
)
=
prev
;
}
tSkipListFreeNode
(
pNode
);
pSkipList
->
size
--
;
pSkipList
->
tsize
--
;
SL_NODE_GET_FORWARD_POINTER
(
prev
,
j
)
=
next
;
SL_NODE_GET_BACKWARD_POINTER
(
next
,
j
)
=
prev
;
}
tSkipListFreeNode
(
pNode
);
pSkipList
->
size
--
;
}
// Function must be called after calling tSkipListRemoveNodeImpl() function
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录