Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d71fece2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d71fece2
编写于
2月 09, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
STSRow refactor
上级
3504d3f8
变更
19
展开全部
隐藏空白更改
内联
并排
Showing
19 changed file
with
645 addition
and
987 deletion
+645
-987
include/common/tdataformat.h
include/common/tdataformat.h
+25
-18
include/common/tmsg.h
include/common/tmsg.h
+3
-3
include/common/trow.h
include/common/trow.h
+198
-91
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+8
-2
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+25
-13
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-5
source/common/src/trow.c
source/common/src/trow.c
+77
-574
source/dnode/vnode/src/inc/tsdbMemTable.h
source/dnode/vnode/src/inc/tsdbMemTable.h
+4
-4
source/dnode/vnode/src/inc/tsdbReadImpl.h
source/dnode/vnode/src/inc/tsdbReadImpl.h
+2
-2
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+12
-4
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+45
-15
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+55
-55
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+56
-51
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+60
-9
source/libs/parser/inc/dataBlockMgt.h
source/libs/parser/inc/dataBlockMgt.h
+14
-21
source/libs/parser/src/dataBlockMgt.c
source/libs/parser/src/dataBlockMgt.c
+24
-91
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+26
-23
source/libs/parser/src/parserUtil.c
source/libs/parser/src/parserUtil.c
+4
-4
未找到文件。
include/common/tdataformat.h
浏览文件 @
d71fece2
...
...
@@ -26,6 +26,7 @@ extern "C" {
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
#define TD_SUPPORT_READ2
#define TD_SUPPORT_BACK2 // suppport back compatibility of 2.0
#define TASSERT(x) ASSERT(x)
...
...
@@ -61,7 +62,7 @@ typedef struct {
int8_t
type
;
// Column type
col_id_t
colId
;
// column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
int16_t
bytes
;
// column bytes (restore to int16_t in case of misuse)
uint16_t
offset
;
// point offset in S
Data
Row after the header part.
uint16_t
offset
;
// point offset in S
Tp
Row after the header part.
}
STColumn
;
#define colType(col) ((col)->type)
...
...
@@ -78,9 +79,9 @@ typedef struct {
typedef
struct
{
int
version
;
// version
int
numOfCols
;
// Number of columns appended
int
tlen
;
// maximum length of a SData
Row without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) +
// (bytes))
uint16_t
flen
;
// First part length in a S
Data
Row after the header part
int
tlen
;
// maximum length of a STp
Row without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) +
// (bytes))
uint16_t
flen
;
// First part length in a S
Tp
Row after the header part
uint16_t
vlen
;
// pure value part length, excluded the overhead (bytes only)
STColumn
columns
[];
}
STSchema
;
...
...
@@ -161,19 +162,19 @@ typedef uint64_t TKEY;
#else
typedef
uint64_t
TKEY
;
// typedef uint64_t TKEY;
#define TKEY TSKEY
#define TKEY_INVALID UINT64_MAX
#define TKEY_NULL TKEY_INVALID
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63)
#define TKEY_DELETE_FLAG (((TKEY)1) << 62)
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG))
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG))
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
#define TKEY_IS_DELETED(tkey) (
((tkey)&TKEY_DELETE_FLAG) != 0
)
#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG)
#define tdGetTKEY(key) (
((TKEY)TABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key))
)
#define tdGetKey(t
key) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1)
)
#define TKEY_IS_DELETED(tkey) (
false
)
#define tdGetTKEY(key) (
key
)
#define tdGetKey(t
skey) (tskey
)
#define MIN_TS_KEY ((TSKEY)0x8000000000000001)
#define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff)
...
...
@@ -206,6 +207,7 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
}
}
#if 0
// ----------------- Data row structure
/* A data row, the format is like below:
...
...
@@ -355,10 +357,12 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche
memcpy(pData, value, pSrcSchema->columns[srcIdx].bytes);
}
}
#endif
// ----------------- Data column structure
typedef
struct
SDataCol
{
int8_t
type
;
// column type
int8_t
type
;
// column type
uint8_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint8_t
reserve
:
7
;
int16_t
colId
;
// column ID
int
bytes
;
// column data bytes defined
int
offset
;
// data offset in a SDataRow (including the header size)
...
...
@@ -366,7 +370,7 @@ typedef struct SDataCol {
int
len
;
// column data length
VarDataOffsetT
*
dataOff
;
// For binary and nchar data, the offset in the data column
void
*
pData
;
// Actual data pointer
void
*
pBitmap
;
// Bitmap pointer
to mark Null/Norm(1 bit for each row)
void
*
pBitmap
;
// Bitmap pointer
TSKEY
ts
;
// only used in last NULL column
}
SDataCol
;
...
...
@@ -378,10 +382,11 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints);
void
dataColInit
(
SDataCol
*
pDataCol
,
STColumn
*
pCol
,
int
maxPoints
);
int
dataColAppendVal
(
SDataCol
*
pCol
,
const
void
*
value
,
int
numOfRows
,
int
maxPoints
);
void
dataColSetOffset
(
SDataCol
*
pCol
,
int
nEle
);
void
*
dataColSetOffset
(
SDataCol
*
pCol
,
int
nEle
);
bool
isNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
#if 0
// Get the data pointer from a column-wised data
static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (isAllRowsNull(pCol)) {
...
...
@@ -403,7 +408,7 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
return TYPE_BYTES[pDataCol->type] * rows;
}
}
#endif
typedef
struct
{
col_id_t
maxCols
;
// max number of columns
col_id_t
numOfCols
;
// Total number of cols
...
...
@@ -521,6 +526,7 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
return
taosbsearch
(
&
colId
,
kvRowColIdx
(
row
),
kvRowNCols
(
row
),
sizeof
(
SColIdx
),
comparTagId
,
TD_EQ
);
}
#if 0
// offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, bool isCopyValData, int16_t colId, int8_t type,
int32_t offset) {
...
...
@@ -567,7 +573,7 @@ static FORCE_INLINE void *tdGetKVRowValOfColEx(SKVRow row, int16_t colId, int32_
}
return NULL;
}
#endif
// ----------------- K-V data row builder
typedef
struct
{
int16_t
tCols
;
...
...
@@ -611,7 +617,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
return
0
;
}
#if 0
// ----------------- SMemRow appended with tuple row structure
/*
* |---------|------------------------------------------------- len ---------------------------------->|
...
...
@@ -770,6 +776,7 @@ static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t c
}
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
#endif
#ifdef __cplusplus
}
...
...
include/common/tmsg.h
浏览文件 @
d71fece2
...
...
@@ -25,7 +25,7 @@ extern "C" {
#include "taoserror.h"
#include "tarray.h"
#include "tcoding.h"
#include "t
dataformat
.h"
#include "t
row
.h"
#include "thash.h"
#include "tlist.h"
...
...
@@ -212,7 +212,7 @@ typedef struct {
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
S
MemRow
row
;
S
TSRow
*
row
;
}
SSubmitBlkIter
;
typedef
struct
{
...
...
@@ -224,7 +224,7 @@ typedef struct {
int
tInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int
tInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
S
MemRow
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
S
TSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
typedef
struct
{
int32_t
index
;
// index of failed block in submit blocks
...
...
include/common/trow.h
浏览文件 @
d71fece2
此差异已折叠。
点击以展开。
source/client/test/clientTests.cpp
浏览文件 @
d71fece2
...
...
@@ -565,12 +565,18 @@ TEST(testCase, insert_test) {
#endif
#if
0
#if
1
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"create database if not exists abc1 vgroups 2"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in create db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable st1 (ts timestamp, k int) tags(a int)"
);
...
...
source/common/src/tdataformat.c
浏览文件 @
d71fece2
...
...
@@ -20,17 +20,23 @@
#include "tarray.h"
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
#if 0
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull);
#endif
int
tdAllocMemForCol
(
SDataCol
*
pCol
,
int
maxPoints
)
{
int
spaceNeeded
=
pCol
->
bytes
*
maxPoints
;
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
spaceNeeded
+=
sizeof
(
VarDataOffsetT
)
*
maxPoints
;
}
#ifdef TD_SUPPORT_BITMAP
spaceNeeded
+=
(
int
)
TD_BITMAP_BYTES
(
maxPoints
);
int32_t
nBitmapBytes
=
(
int32_t
)
TD_BITMAP_BYTES
(
maxPoints
);
spaceNeeded
+=
(
int
)
nBitmapBytes
;
// TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
// TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
spaceNeeded
+=
TYPE_BYTES
[
pCol
->
type
];
#endif
if
(
pCol
->
spaceSize
<
spaceNeeded
)
{
void
*
ptr
=
realloc
(
pCol
->
pData
,
spaceNeeded
);
if
(
ptr
==
NULL
)
{
...
...
@@ -42,16 +48,17 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
pCol
->
spaceSize
=
spaceNeeded
;
}
}
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
#ifdef TD_SUPPORT_BITMAP
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
dataOff
,
sizeof
(
VarDataOffsetT
)
*
maxPoints
);
#endif
}
#ifdef TD_SUPPORT_BITMAP
else
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pBitmap
,
nBitmapBytes
);
}
else
{
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
}
#else
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
}
#endif
return
0
;
}
...
...
@@ -205,6 +212,7 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
return
pSchema
;
}
#if 0
/**
* Initialize a data row
*/
...
...
@@ -245,12 +253,13 @@ SMemRow tdMemRowDup(SMemRow row) {
memRowCpy(trow, row);
return trow;
}
#endif
void
dataColInit
(
SDataCol
*
pDataCol
,
STColumn
*
pCol
,
int
maxPoints
)
{
pDataCol
->
type
=
colType
(
pCol
);
pDataCol
->
colId
=
colColId
(
pCol
);
pDataCol
->
bytes
=
colBytes
(
pCol
);
pDataCol
->
offset
=
colOffset
(
pCol
)
+
TD_DATA_ROW_HEAD_SIZE
;
pDataCol
->
offset
=
colOffset
(
pCol
)
+
0
;
//
TD_DATA_ROW_HEAD_SIZE;
pDataCol
->
len
=
0
;
}
...
...
@@ -326,7 +335,7 @@ static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
}
}
void
dataColSetOffset
(
SDataCol
*
pCol
,
int
nEle
)
{
void
*
dataColSetOffset
(
SDataCol
*
pCol
,
int
nEle
)
{
ASSERT
(((
pCol
->
type
==
TSDB_DATA_TYPE_BINARY
)
||
(
pCol
->
type
==
TSDB_DATA_TYPE_NCHAR
)));
void
*
tptr
=
pCol
->
pData
;
...
...
@@ -338,6 +347,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) {
offset
+=
varDataTLen
(
tptr
);
tptr
=
POINTER_SHIFT
(
tptr
,
varDataTLen
(
tptr
));
}
return
POINTER_SHIFT
(
tptr
,
varDataTLen
(
tptr
));
}
SDataCols
*
tdNewDataCols
(
int
maxCols
,
int
maxRows
)
{
...
...
@@ -455,7 +465,7 @@ void tdResetDataCols(SDataCols *pCols) {
}
}
}
#if 0
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
...
...
@@ -628,6 +638,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i
ASSERT(target->numOfRows <= target->maxPoints);
}
}
#endif
SKVRow
tdKVRowDup
(
SKVRow
row
)
{
SKVRow
trow
=
malloc
(
kvRowLen
(
row
));
...
...
@@ -787,7 +798,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
return
row
;
}
#if 0
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
ASSERT(memRowKey(row1) == memRowKey(row2));
...
...
@@ -881,3 +892,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
taosArrayDestroy(stashRow);
return buffer;
}
#endif
\ No newline at end of file
source/common/src/tmsg.c
浏览文件 @
d71fece2
...
...
@@ -67,19 +67,19 @@ int tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if
(
pBlock
->
dataLen
<=
0
)
return
-
1
;
pIter
->
totalLen
=
pBlock
->
dataLen
;
pIter
->
len
=
0
;
pIter
->
row
=
(
S
MemRow
)(
pBlock
->
data
+
pBlock
->
schemaLen
);
pIter
->
row
=
(
S
TSRow
*
)(
pBlock
->
data
+
pBlock
->
schemaLen
);
return
0
;
}
S
MemRow
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
)
{
S
MemRow
row
=
pIter
->
row
;
S
TSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
)
{
S
TSRow
*
row
=
pIter
->
row
;
if
(
pIter
->
len
>=
pIter
->
totalLen
)
{
return
NULL
;
}
else
{
pIter
->
len
+=
memRowTLen
(
row
);
pIter
->
len
+=
TD_ROW_LEN
(
row
);
if
(
pIter
->
len
<
pIter
->
totalLen
)
{
pIter
->
row
=
POINTER_SHIFT
(
row
,
memRowTLen
(
row
));
pIter
->
row
=
POINTER_SHIFT
(
row
,
TD_ROW_LEN
(
row
));
}
return
row
;
}
...
...
source/common/src/trow.c
浏览文件 @
d71fece2
此差异已折叠。
点击以展开。
source/dnode/vnode/src/inc/tsdbMemTable.h
浏览文件 @
d71fece2
...
...
@@ -58,20 +58,20 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmi
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
static
FORCE_INLINE
S
MemRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
static
FORCE_INLINE
S
TSRow
*
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
(
S
MemRow
)
SL_GET_NODE_DATA
(
node
);
return
(
S
TSRow
*
)
SL_GET_NODE_DATA
(
node
);
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
S
MemRow
row
=
tsdbNextIterRow
(
pIter
);
S
TSRow
*
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
return
memRowKey
(
row
);
return
TD_ROW_KEY
(
row
);
}
#ifdef __cplusplus
...
...
source/dnode/vnode/src/inc/tsdbReadImpl.h
浏览文件 @
d71fece2
...
...
@@ -75,8 +75,8 @@ typedef struct {
typedef
struct
{
int16_t
colId
;
int16_t
bitmap
:
1
;
// 0: no bitmap if all rows not null, 1: has bitmap if has null rows
int16_t
reserve
:
15
;
uint16_t
bitmap
:
1
;
// 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
uint16_t
reserve
:
15
;
int32_t
len
;
uint32_t
type
:
8
;
uint32_t
offset
:
24
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d71fece2
...
...
@@ -448,19 +448,27 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush
(
pArray
,
&
colInfo
);
}
SMemRow
row
;
int32_t
kvIdx
=
0
;
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pTschema
);
STSRow
*
row
;
// int32_t kvIdx = 0;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
tdSTSRowIterReset
(
&
iter
,
row
);
// get all wanted col of that block
for
(
int32_t
i
=
0
;
i
<
colNumNeed
;
i
++
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pArray
,
i
);
STColumn
*
pCol
=
schemaColAt
(
pTschema
,
i
);
// TODO
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
void
*
val
=
tdGetMemRowDataOfColEx
(
row
,
pCol
->
colId
,
pCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pCol
->
offset
,
&
kvIdx
);
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pCol
->
bytes
),
val
,
pCol
->
bytes
);
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal
sVal
=
{
0
};
if
(
!
tdSTSRowIterNext
(
&
iter
,
pCol
->
colId
,
pCol
->
type
,
&
sVal
))
{
// TODO: reach end
break
;
}
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pCol
->
bytes
),
sVal
.
val
,
pCol
->
bytes
);
}
curRow
++
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
d71fece2
...
...
@@ -1162,6 +1162,13 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
(
*
tDataTypes
[
pDataCol
->
type
].
statisFunc
)(
pDataCol
->
pData
,
rowsToWrite
,
&
(
pBlockCol
->
min
),
&
(
pBlockCol
->
max
),
&
(
pBlockCol
->
sum
),
&
(
pBlockCol
->
minIndex
),
&
(
pBlockCol
->
maxIndex
),
&
(
pBlockCol
->
numOfNull
));
if
(
pBlockCol
->
numOfNull
==
0
)
{
pBlockCol
->
bitmap
=
1
;
}
else
{
pBlockCol
->
bitmap
=
0
;
}
}
else
{
pBlockCol
->
bitmap
=
0
;
}
nColsNotAllNull
++
;
}
...
...
@@ -1174,6 +1181,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
int32_t
tsize
=
TSDB_BLOCK_STATIS_SIZE
(
nColsNotAllNull
);
int32_t
lsize
=
tsize
;
int32_t
keyLen
=
0
;
int32_t
nBitmaps
=
(
int32_t
)
TD_BITMAP_BYTES
(
rowsToWrite
);
int32_t
tBitmaps
=
0
;
for
(
int
ncol
=
0
;
ncol
<
pDataCols
->
numOfCols
;
ncol
++
)
{
// All not NULL columns finish
if
(
ncol
!=
0
&&
tcol
>=
nColsNotAllNull
)
break
;
...
...
@@ -1185,9 +1195,23 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
int32_t
flen
;
// final length
int32_t
tlen
=
dataColGetNEleLen
(
pDataCol
,
rowsToWrite
);
#ifdef TD_SUPPORT_BITMAP
tlen
+=
(
int32_t
)
TD_BITMAP_BYTES
(
rowsToWrite
);
int32_t
tBitmaps
=
0
;
if
((
ncol
!=
0
)
&&
(
pBlockCol
->
bitmap
==
0
))
{
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
tBitmaps
=
nBitmaps
;
tlen
+=
tBitmaps
;
}
else
{
tBitmaps
=
(
int32_t
)
ceil
((
double
)
nBitmaps
/
TYPE_BYTES
[
pDataCol
->
type
]);
tlen
+=
tBitmaps
*
TYPE_BYTES
[
pDataCol
->
type
];
}
// move bitmap parts ahead
// TODO: put bitmap part to the 1st location(pBitmap points to pData) to avoid the memmove
memcpy
(
POINTER_SHIFT
(
pDataCol
->
pData
,
pDataCol
->
len
),
pDataCol
->
pBitmap
,
nBitmaps
);
}
#endif
void
*
tptr
;
// Make room
...
...
@@ -1204,7 +1228,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
// Compress or just copy
if
(
pCfg
->
compression
)
{
flen
=
(
*
(
tDataTypes
[
pDataCol
->
type
].
compFunc
))((
char
*
)
pDataCol
->
pData
,
tlen
,
rowsToWrite
,
tptr
,
flen
=
(
*
(
tDataTypes
[
pDataCol
->
type
].
compFunc
))((
char
*
)
pDataCol
->
pData
,
tlen
,
rowsToWrite
+
tBitmaps
,
tptr
,
tlen
+
COMP_OVERFLOW_BYTES
,
pCfg
->
compression
,
*
ppCBuf
,
tlen
+
COMP_OVERFLOW_BYTES
);
}
else
{
...
...
@@ -1495,11 +1519,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
while
(
true
)
{
key1
=
(
*
iter
>=
pDataCols
->
numOfRows
)
?
INT64_MAX
:
dataColsKeyAt
(
pDataCols
,
*
iter
);
S
MemRow
row
=
tsdbNextIterRow
(
pCommitIter
->
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
S
TSRow
*
row
=
tsdbNextIterRow
(
pCommitIter
->
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
key2
=
INT64_MAX
;
}
else
{
key2
=
memRowKey
(
row
);
key2
=
TD_ROW_KEY
(
row
);
}
if
(
key1
==
INT64_MAX
&&
key2
==
INT64_MAX
)
break
;
...
...
@@ -1507,19 +1531,22 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if
(
key1
<
key2
)
{
for
(
int
i
=
0
;
i
<
pDataCols
->
numOfCols
;
i
++
)
{
// TODO: dataColAppendVal may fail
dataColAppendVal
(
pTarget
->
cols
+
i
,
tdGetColDataOfRow
(
pDataCols
->
cols
+
i
,
*
iter
),
pTarget
->
numOfRows
,
pTarget
->
maxPoints
);
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
pDataCols
->
cols
+
i
,
*
iter
)
<
0
)
{
TASSERT
(
0
);
}
tdAppendValToDataCol
(
pTarget
->
cols
+
i
,
sVal
.
valType
,
sVal
.
val
,
pTarget
->
numOfRows
,
pTarget
->
maxPoints
);
}
pTarget
->
numOfRows
++
;
(
*
iter
)
++
;
}
else
if
(
key1
>
key2
)
{
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
memRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
memRowVersion
(
row
));
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
TD_ROW_SVER
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
ASSERT
(
pSchema
!=
NULL
);
}
tdAppend
Mem
RowToDataCol
(
row
,
pSchema
,
pTarget
,
true
);
tdAppend
STS
RowToDataCol
(
row
,
pSchema
,
pTarget
,
true
);
tSkipListIterNext
(
pCommitIter
->
pIter
);
}
else
{
...
...
@@ -1527,20 +1554,23 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
// copy disk data
for
(
int
i
=
0
;
i
<
pDataCols
->
numOfCols
;
i
++
)
{
// TODO: dataColAppendVal may fail
dataColAppendVal
(
pTarget
->
cols
+
i
,
tdGetColDataOfRow
(
pDataCols
->
cols
+
i
,
*
iter
),
pTarget
->
numOfRows
,
pTarget
->
maxPoints
);
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
pDataCols
->
cols
+
i
,
*
iter
)
<
0
)
{
TASSERT
(
0
);
}
tdAppendValToDataCol
(
pTarget
->
cols
+
i
,
sVal
.
valType
,
sVal
.
val
,
pTarget
->
numOfRows
,
pTarget
->
maxPoints
);
}
if
(
update
==
TD_ROW_DISCARD_UPDATE
)
pTarget
->
numOfRows
++
;
}
if
(
update
!=
TD_ROW_DISCARD_UPDATE
)
{
// copy mem data
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
memRowVersion
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
memRowVersion
(
row
));
if
(
pSchema
==
NULL
||
schemaVersion
(
pSchema
)
!=
TD_ROW_SVER
(
row
))
{
pSchema
=
tsdbGetTableSchemaImpl
(
pCommitIter
->
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
ASSERT
(
pSchema
!=
NULL
);
}
tdAppend
Mem
RowToDataCol
(
row
,
pSchema
,
pTarget
,
update
==
TD_ROW_OVERWRITE_UPDATE
);
tdAppend
STS
RowToDataCol
(
row
,
pSchema
,
pTarget
,
update
==
TD_ROW_OVERWRITE_UPDATE
);
}
(
*
iter
)
++
;
tSkipListIterNext
(
pCommitIter
->
pIter
);
...
...
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
d71fece2
...
...
@@ -760,7 +760,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
int numColumns;
int32_t blockIdx;
SDataStatis* pBlockStatis = NULL;
S
MemRow
row = NULL;
S
TSRow*
row = NULL;
// restore last column data with last schema
int err = 0;
...
...
@@ -866,7 +866,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0);
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset);
pLastCol->ts =
memRowKey
(row);
pLastCol->ts =
TD_ROW_KEY
(row);
pTable->restoreColumnNum += 1;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
d71fece2
...
...
@@ -22,7 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData);
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
);
static
int
tsdbTbDataComp
(
const
void
*
arg1
,
const
void
*
arg2
);
static
char
*
tsdbTbDataGetUid
(
const
void
*
arg
);
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
S
MemRow
row
);
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
S
TSRow
*
row
);
STsdbMemTable
*
tsdbNewMemTable
(
STsdb
*
pTsdb
)
{
STsdbMemTable
*
pMemTable
=
(
STsdbMemTable
*
)
calloc
(
1
,
sizeof
(
*
pMemTable
));
...
...
@@ -124,7 +124,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
TSKEY
fKey
=
0
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
S
MemRow
row
=
NULL
;
S
TSRow
*
row
=
NULL
;
SMergeInfo
mInfo
;
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
...
...
@@ -135,12 +135,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
...
...
@@ -177,12 +177,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
}
else
{
if
(
isRowDel
)
{
...
...
@@ -207,12 +207,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
filterIter
++
;
...
...
@@ -233,7 +233,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) {
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
SSubmitBlkIter
blkIter
=
{
0
};
S
MemRow
row
=
NULL
;
S
TSRow
*
row
=
NULL
;
TSKEY
now
=
taosGetTimestamp
(
pTsdb
->
config
.
precision
);
TSKEY
minKey
=
now
-
tsTickPerDay
[
pTsdb
->
config
.
precision
]
*
pTsdb
->
config
.
keep
;
TSKEY
maxKey
=
now
+
tsTickPerDay
[
pTsdb
->
config
.
precision
]
*
pTsdb
->
config
.
daysPerFile
;
...
...
@@ -306,7 +306,7 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
STsdbMemTable
*
pMemTable
=
pTsdb
->
mem
;
void
*
tptr
;
STbData
*
pTbData
;
S
MemRow
row
;
S
TSRow
*
row
;
TSKEY
keyMin
;
TSKEY
keyMax
;
...
...
@@ -332,12 +332,12 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
tInitSubmitBlkIter
(
pBlock
,
&
blkIter
);
if
(
blkIter
.
row
==
NULL
)
return
0
;
keyMin
=
memRowKey
(
blkIter
.
row
);
keyMin
=
TD_ROW_KEY
(
blkIter
.
row
);
tSkipListPutBatchByIter
(
pTbData
->
pData
,
&
blkIter
,
(
iter_next_fn_t
)
tGetSubmitBlkNext
);
// Set statistics
keyMax
=
memRowKey
(
blkIter
.
row
);
keyMax
=
TD_ROW_KEY
(
blkIter
.
row
);
pTbData
->
nrows
+=
pBlock
->
numOfRows
;
if
(
pTbData
->
keyMin
>
keyMin
)
pTbData
->
keyMin
=
keyMin
;
...
...
@@ -347,7 +347,7 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
if
(
pMemTable
->
keyMin
>
keyMin
)
pMemTable
->
keyMin
=
keyMin
;
if
(
pMemTable
->
keyMax
<
keyMax
)
pMemTable
->
keyMax
=
keyMax
;
// S
MemRow
lastRow = NULL;
// S
TSRow*
lastRow = NULL;
// int64_t osize = SL_SIZE(pTableData->pData);
// tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
// tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext);
...
...
@@ -355,7 +355,7 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
// (*pAffectedRows) += points;
// if(lastRow != NULL) {
// TSKEY lastRowKey =
memRowKey
(lastRow);
// TSKEY lastRowKey =
TD_ROW_KEY
(lastRow);
// if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey;
// pMemTable->numOfRows += dsize;
...
...
@@ -418,7 +418,7 @@ static void tsdbFreeTbData(STbData *pTbData) {
}
}
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
)
{
return
memRowKeys
((
SMemRow
)
data
);
}
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
)
{
return
(
char
*
)
TD_ROW_KEY_ADDR
((
STSRow
*
)
data
);
}
static
int
tsdbTbDataComp
(
const
void
*
arg1
,
const
void
*
arg2
)
{
STbData
*
pTbData1
=
(
STbData
*
)
arg1
;
...
...
@@ -437,17 +437,17 @@ static char *tsdbTbDataGetUid(const void *arg) {
STbData
*
pTbData
=
(
STbData
*
)
arg
;
return
(
char
*
)(
&
(
pTbData
->
uid
));
}
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
S
MemRow
row
)
{
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
S
TSRow
*
row
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
memRowVersion
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
memRowVersion
(
row
));
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
TD_ROW_SVER
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppend
Mem
RowToDataCol
(
row
,
*
ppSchema
,
pCols
,
true
);
tdAppend
STS
RowToDataCol
(
row
,
*
ppSchema
,
pCols
,
true
);
}
return
0
;
...
...
@@ -479,7 +479,7 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) {
typedef struct {
int32_t totalLen;
int32_t len;
S
MemRow
row;
S
TSRow*
row;
} SSubmitBlkIter;
typedef struct {
...
...
@@ -493,17 +493,17 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, S
MemRow
row);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, S
TSRow*
row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static S
MemRow
tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static S
TSRow*
tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, S
MemRow
row);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, S
TSRow*
row);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, S
MemRow
row, TSKEY minKey, TSKEY maxKey,
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, S
TSRow*
row, TSKEY minKey, TSKEY maxKey,
TSKEY now);
...
...
@@ -685,7 +685,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
TSKEY fKey = 0;
bool isRowDel = false;
int filterIter = 0;
S
MemRow
row = NULL;
S
TSRow*
row = NULL;
SMergeInfo mInfo;
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
...
...
@@ -696,11 +696,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (pCols) tdResetDataCols(pCols);
row = tsdbNextIterRow(pIter);
if (row == NULL ||
memRowKey
(row) > maxKey) {
if (row == NULL ||
TD_ROW_KEY
(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey =
memRowKey
(row);
rowKey =
TD_ROW_KEY
(row);
isRowDel = memRowDeleted(row);
}
...
...
@@ -738,11 +738,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL ||
memRowKey
(row) > maxKey) {
if (row == NULL ||
TD_ROW_KEY
(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey =
memRowKey
(row);
rowKey =
TD_ROW_KEY
(row);
isRowDel = memRowDeleted(row);
}
} else {
...
...
@@ -768,11 +768,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter);
if (row == NULL ||
memRowKey
(row) > maxKey) {
if (row == NULL ||
TD_ROW_KEY
(row) > maxKey) {
rowKey = INT64_MAX;
isRowDel = false;
} else {
rowKey =
memRowKey
(row);
rowKey =
TD_ROW_KEY
(row);
isRowDel = memRowDeleted(row);
}
...
...
@@ -790,9 +790,9 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
// ---------------- LOCAL FUNCTIONS ----------------
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, S
MemRow
row, TSKEY minKey, TSKEY maxKey,
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, S
TSRow*
row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey =
memRowKey
(row);
TSKEY rowKey =
TD_ROW_KEY
(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
...
...
@@ -807,9 +807,9 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem
//row1 has higher priority
static S
MemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow
row2, STsdbRepo* pRepo,
static S
TSRow* tsdbInsertDupKeyMerge(STSRow* row1, STSRow*
row2, STsdbRepo* pRepo,
STSchema **ppSchema1, STSchema **ppSchema2,
STable* pTable, int32_t* pPoints, S
MemRow
* pLastRow) {
STable* pTable, int32_t* pPoints, S
TSRow*
* pLastRow) {
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) {
...
...
@@ -819,10 +819,10 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
"updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
memRowKey
(row1));
TD_ROW_KEY
(row1));
if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) {
void* pMem = tsdbAllocBytes(pRepo,
memRowTLen
(row1));
void* pMem = tsdbAllocBytes(pRepo,
TD_ROW_LEN
(row1));
if(pMem == NULL) return NULL;
memRowCpy(pMem, row1);
(*pPoints)++;
...
...
@@ -853,9 +853,9 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep
}
}
S
MemRow
tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2);
S
TSRow*
tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2);
void* pMem = tsdbAllocBytes(pRepo,
memRowTLen
(tmp));
void* pMem = tsdbAllocBytes(pRepo,
TD_ROW_LEN
(tmp));
if(pMem == NULL) return NULL;
memRowCpy(pMem, tmp);
...
...
@@ -868,7 +868,7 @@ static void* tsdbInsertDupKeyMergePacked(void** args) {
return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]);
}
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, S
MemRow
* pLastRow) {
static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, S
TSRow*
* pLastRow) {
if(pSkipList->insertHandleFn == NULL) {
tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9);
...
...
@@ -953,7 +953,7 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
return 0;
}
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, S
MemRow
row) {
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, S
TSRow*
row) {
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data,
memRowVersion(row));
...
...
@@ -1002,30 +1002,30 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
assert(pTCol->bytes >= bytes);
memcpy(pDataCol->pData, value, bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol->ts =
memRowKey
(row);
pDataCol->ts =
TD_ROW_KEY
(row);
// unlock
TSDB_WUNLOCK_TABLE(pTable);
}
}
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, S
MemRow
row) {
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, S
TSRow*
row) {
STsdbCfg *pCfg = &pRepo->config;
// if cacheLastRow config has been reset, free the lastRow
if (!pCfg->cacheLastRow && pTable->lastRow != NULL) {
S
MemRow
cachedLastRow = pTable->lastRow;
S
TSRow*
cachedLastRow = pTable->lastRow;
TSDB_WLOCK_TABLE(pTable);
pTable->lastRow = NULL;
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(cachedLastRow);
}
if (tsdbGetTableLastKeyImpl(pTable) <=
memRowKey
(row)) {
if (tsdbGetTableLastKeyImpl(pTable) <=
TD_ROW_KEY
(row)) {
if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) {
S
MemRow
nrow = pTable->lastRow;
if (taosTSizeof(nrow) <
memRowTLen
(row)) {
S
MemRow
orow = nrow;
nrow = taosTMalloc(
memRowTLen
(row));
S
TSRow*
nrow = pTable->lastRow;
if (taosTSizeof(nrow) <
TD_ROW_LEN
(row)) {
S
TSRow*
orow = nrow;
nrow = taosTMalloc(
TD_ROW_LEN
(row));
if (nrow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
...
...
@@ -1033,18 +1033,18 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
memRowCpy(nrow, row);
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey =
memRowKey
(row);
pTable->lastKey =
TD_ROW_KEY
(row);
pTable->lastRow = nrow;
TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(orow);
} else {
TSDB_WLOCK_TABLE(pTable);
pTable->lastKey =
memRowKey
(row);
pTable->lastKey =
TD_ROW_KEY
(row);
memRowCpy(nrow, row);
TSDB_WUNLOCK_TABLE(pTable);
}
} else {
pTable->lastKey =
memRowKey
(row);
pTable->lastKey =
TD_ROW_KEY
(row);
}
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
d71fece2
...
...
@@ -157,7 +157,7 @@ typedef struct STableGroupSupporter {
static
STimeWindow
updateLastrowForEachGroup
(
STableGroupInfo
*
groupList
);
static
int32_t
checkForCachedLastRow
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableGroupInfo
*
groupList
);
static
int32_t
checkForCachedLast
(
STsdbReadHandle
*
pTsdbReadHandle
);
//
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow
* pRes, TSKEY* lastKey);
//
static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow*
* pRes, TSKEY* lastKey);
static
void
changeQueryHandleForInterpQuery
(
tsdbReaderT
pHandle
);
static
void
doMergeTwoLevelData
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableCheckInfo
*
pCheckInfo
,
SBlock
*
pBlock
);
...
...
@@ -689,8 +689,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
assert
(
node
!=
NULL
);
S
MemRow
row
=
(
SMemRow
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
memRowKey
(
row
);
// first timestamp in buffer
S
TSRow
*
row
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
TD_ROW_KEY
(
row
);
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", check data in mem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
", %s"
,
pHandle
,
pCheckInfo
->
tableId
,
key
,
order
,
(
*
pMem
)
->
keyMin
,
(
*
pMem
)
->
keyMax
,
pCheckInfo
->
lastKey
,
(
*
pMem
)
->
nrows
,
pHandle
->
idStr
);
...
...
@@ -709,8 +709,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
assert
(
node
!=
NULL
);
S
MemRow
row
=
(
SMemRow
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
memRowKey
(
row
);
// first timestamp in buffer
S
TSRow
*
row
=
(
STSRow
*
)
SL_GET_NODE_DATA
(
node
);
TSKEY
key
=
TD_ROW_KEY
(
row
);
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", check data in imem from skey:%"
PRId64
", order:%d, ts range in buf:%"
PRId64
"-%"
PRId64
", lastKey:%"
PRId64
", numOfRows:%"
PRId64
", %s"
,
pHandle
,
pCheckInfo
->
tableId
,
key
,
order
,
(
*
pIMem
)
->
keyMin
,
(
*
pIMem
)
->
keyMax
,
pCheckInfo
->
lastKey
,
(
*
pIMem
)
->
nrows
,
pHandle
->
idStr
);
...
...
@@ -733,18 +733,18 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
}
static
TSKEY
extractFirstTraverseKey
(
STableCheckInfo
*
pCheckInfo
,
int32_t
order
,
int32_t
update
)
{
S
MemRow
rmem
=
NULL
,
rimem
=
NULL
;
S
TSRow
*
rmem
=
NULL
,
*
rimem
=
NULL
;
if
(
pCheckInfo
->
iter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
if
(
node
!=
NULL
)
{
rmem
=
(
S
MemRow
)
SL_GET_NODE_DATA
(
node
);
rmem
=
(
S
TSRow
*
)
SL_GET_NODE_DATA
(
node
);
}
}
if
(
pCheckInfo
->
iiter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
if
(
node
!=
NULL
)
{
rimem
=
(
S
MemRow
)
SL_GET_NODE_DATA
(
node
);
rimem
=
(
S
TSRow
*
)
SL_GET_NODE_DATA
(
node
);
}
}
...
...
@@ -754,16 +754,16 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
if
(
rmem
!=
NULL
&&
rimem
==
NULL
)
{
pCheckInfo
->
chosen
=
CHECKINFO_CHOSEN_MEM
;
return
memRowKey
(
rmem
);
return
TD_ROW_KEY
(
rmem
);
}
if
(
rmem
==
NULL
&&
rimem
!=
NULL
)
{
pCheckInfo
->
chosen
=
CHECKINFO_CHOSEN_IMEM
;
return
memRowKey
(
rimem
);
return
TD_ROW_KEY
(
rimem
);
}
TSKEY
r1
=
memRowKey
(
rmem
);
TSKEY
r2
=
memRowKey
(
rimem
);
TSKEY
r1
=
TD_ROW_KEY
(
rmem
);
TSKEY
r2
=
TD_ROW_KEY
(
rimem
);
if
(
r1
==
r2
)
{
if
(
update
==
TD_ROW_DISCARD_UPDATE
){
...
...
@@ -787,19 +787,19 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
}
}
static
S
MemRow
getSMemRowInTableMem
(
STableCheckInfo
*
pCheckInfo
,
int32_t
order
,
int32_t
update
,
SMemRow
*
extraRow
)
{
S
MemRow
rmem
=
NULL
,
rimem
=
NULL
;
static
S
TSRow
*
getSRowInTableMem
(
STableCheckInfo
*
pCheckInfo
,
int32_t
order
,
int32_t
update
,
STSRow
*
*
extraRow
)
{
S
TSRow
*
rmem
=
NULL
,
*
rimem
=
NULL
;
if
(
pCheckInfo
->
iter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iter
);
if
(
node
!=
NULL
)
{
rmem
=
(
S
MemRow
)
SL_GET_NODE_DATA
(
node
);
rmem
=
(
S
TSRow
*
)
SL_GET_NODE_DATA
(
node
);
}
}
if
(
pCheckInfo
->
iiter
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
pCheckInfo
->
iiter
);
if
(
node
!=
NULL
)
{
rimem
=
(
S
MemRow
)
SL_GET_NODE_DATA
(
node
);
rimem
=
(
S
TSRow
*
)
SL_GET_NODE_DATA
(
node
);
}
}
...
...
@@ -817,8 +817,8 @@ static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order,
return
rimem
;
}
TSKEY
r1
=
memRowKey
(
rmem
);
TSKEY
r2
=
memRowKey
(
rimem
);
TSKEY
r1
=
TD_ROW_KEY
(
rmem
);
TSKEY
r2
=
TD_ROW_KEY
(
rimem
);
if
(
r1
==
r2
)
{
if
(
update
==
TD_ROW_DISCARD_UPDATE
)
{
...
...
@@ -831,7 +831,7 @@ static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order,
return
rmem
;
}
else
{
pCheckInfo
->
chosen
=
CHECKINFO_CHOSEN_BOTH
;
extraRow
=
rimem
;
*
extraRow
=
rimem
;
return
rmem
;
}
}
else
{
...
...
@@ -904,12 +904,12 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
initTableMemIterator
(
pHandle
,
pCheckInfo
);
}
S
MemRow
row
=
getSMem
RowInTableMem
(
pCheckInfo
,
pHandle
->
order
,
pCfg
->
update
,
NULL
);
S
TSRow
*
row
=
getS
RowInTableMem
(
pCheckInfo
,
pHandle
->
order
,
pCfg
->
update
,
NULL
);
if
(
row
==
NULL
)
{
return
false
;
}
pCheckInfo
->
lastKey
=
memRowKey
(
row
);
// first timestamp in buffer
pCheckInfo
->
lastKey
=
TD_ROW_KEY
(
row
);
// first timestamp in buffer
tsdbDebug
(
"%p uid:%"
PRId64
", check data in buffer from skey:%"
PRId64
", order:%d, %s"
,
pHandle
,
pCheckInfo
->
tableId
,
pCheckInfo
->
lastKey
,
pHandle
->
order
,
pHandle
->
idStr
);
...
...
@@ -1418,8 +1418,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
// todo refactor, only copy one-by-one
for
(
int32_t
k
=
start
;
k
<
num
+
start
;
++
k
)
{
const
char
*
p
=
tdGetColDataOfRow
(
src
,
k
);
memcpy
(
dst
,
p
,
varDataTLen
(
p
));
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
src
,
k
)
<
0
){
TASSERT
(
0
);
}
memcpy
(
dst
,
sVal
.
val
,
varDataTLen
(
sVal
.
val
));
dst
+=
bytes
;
}
}
...
...
@@ -1470,16 +1473,17 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t
}
// Note: row1 always has high priority
static
void
mergeTwoRowFromMem
(
STsdbReadHandle
*
pTsdbReadHandle
,
int32_t
capacity
,
int32_t
numOfRows
,
SMemRow
row1
,
SMemRow
row2
,
int32_t
numOfCols
,
uint64_t
uid
,
STSchema
*
pSchema1
,
STSchema
*
pSchema2
,
bool
forceSetNull
)
{
static
void
mergeTwoRowFromMem
(
STsdbReadHandle
*
pTsdbReadHandle
,
int32_t
capacity
,
int32_t
numOfRows
,
STSRow
*
row1
,
STSRow
*
row2
,
int32_t
numOfCols
,
uint64_t
uid
,
STSchema
*
pSchema1
,
STSchema
*
pSchema2
,
bool
forceSetNull
)
{
#if 0
char* pData = NULL;
STSchema* pSchema;
S
MemRow
row
;
S
TSRow*
row;
int16_t colId;
int16_t offset;
bool
isRow1DataRow
=
isDataRow
(
row1
);
bool isRow1DataRow =
TD_IS_TP_ROW
(row1);
bool isRow2DataRow;
bool isChosenRowDataRow;
int32_t chosen_itr;
...
...
@@ -1495,19 +1499,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
if(isRow1DataRow) {
numOfColsOfRow1 = schemaNCols(pSchema1);
} else {
numOfColsOfRow1
=
kvRowNCols
(
memRowKvBody
(
row1
)
);
numOfColsOfRow1 =
TD_ROW_NCOLS(row1
);
}
int32_t numOfColsOfRow2 = 0;
if(row2) {
isRow2DataRow
=
isDataRow
(
row2
);
isRow2DataRow =
TD_IS_TP_ROW
(row2);
if (pSchema2 == NULL) {
pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, 0);
}
if(isRow2DataRow) {
numOfColsOfRow2 = schemaNCols(pSchema2);
} else {
numOfColsOfRow2
=
kvRowNCols
(
memRowKvBody
(
row2
)
);
numOfColsOfRow2 =
TD_ROW_NCOLS(row2
);
}
}
...
...
@@ -1669,6 +1673,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
i++;
}
}
#endif
}
static
void
moveDataToFront
(
STsdbReadHandle
*
pTsdbReadHandle
,
int32_t
numOfRows
,
int32_t
numOfCols
)
{
...
...
@@ -1851,13 +1856,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
else
if
(
pCheckInfo
->
iter
!=
NULL
||
pCheckInfo
->
iiter
!=
NULL
)
{
SSkipListNode
*
node
=
NULL
;
do
{
S
MemRow
row2
=
NULL
;
S
MemRow
row1
=
getSMem
RowInTableMem
(
pCheckInfo
,
pTsdbReadHandle
->
order
,
pCfg
->
update
,
&
row2
);
S
TSRow
*
row2
=
NULL
;
S
TSRow
*
row1
=
getS
RowInTableMem
(
pCheckInfo
,
pTsdbReadHandle
->
order
,
pCfg
->
update
,
&
row2
);
if
(
row1
==
NULL
)
{
break
;
}
TSKEY
key
=
memRowKey
(
row1
);
TSKEY
key
=
TD_ROW_KEY
(
row1
);
if
((
key
>
pTsdbReadHandle
->
window
.
ekey
&&
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
||
(
key
<
pTsdbReadHandle
->
window
.
ekey
&&
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)))
{
break
;
...
...
@@ -1870,13 +1875,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
if
((
key
<
tsArray
[
pos
]
&&
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
||
(
key
>
tsArray
[
pos
]
&&
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)))
{
if
(
rv1
!=
memRowVersion
(
row1
))
{
if
(
rv1
!=
TD_ROW_SVER
(
row1
))
{
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1
=
memRowVersion
(
row1
);
rv1
=
TD_ROW_SVER
(
row1
);
}
if
(
row2
&&
rv2
!=
memRowVersion
(
row2
))
{
if
(
row2
&&
rv2
!=
TD_ROW_SVER
(
row2
))
{
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2
=
memRowVersion
(
row2
);
rv2
=
TD_ROW_SVER
(
row2
);
}
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
row1
,
row2
,
numOfCols
,
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
true
);
...
...
@@ -1895,13 +1900,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
if
(
pCfg
->
update
==
TD_ROW_PARTIAL_UPDATE
)
{
doCopyRowsFromFileBlock
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
numOfRows
,
pos
,
pos
);
}
if
(
rv1
!=
memRowVersion
(
row1
))
{
if
(
rv1
!=
TD_ROW_SVER
(
row1
))
{
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1
=
memRowVersion
(
row1
);
rv1
=
TD_ROW_SVER
(
row1
);
}
if
(
row2
&&
rv2
!=
memRowVersion
(
row2
))
{
if
(
row2
&&
rv2
!=
TD_ROW_SVER
(
row2
))
{
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2
=
memRowVersion
(
row2
);
rv2
=
TD_ROW_SVER
(
row2
);
}
bool
forceSetNull
=
pCfg
->
update
!=
TD_ROW_PARTIAL_UPDATE
;
...
...
@@ -1954,9 +1959,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
* copy them all to result buffer, since it may be overlapped with file data block.
*/
if
(
node
==
NULL
||
((
memRowKey
((
SMemRow
)
SL_GET_NODE_DATA
(
node
))
>
pTsdbReadHandle
->
window
.
ekey
)
&&
((
TD_ROW_KEY
((
STSRow
*
)
SL_GET_NODE_DATA
(
node
))
>
pTsdbReadHandle
->
window
.
ekey
)
&&
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
||
((
memRowKey
((
SMemRow
)
SL_GET_NODE_DATA
(
node
))
<
pTsdbReadHandle
->
window
.
ekey
)
&&
((
TD_ROW_KEY
((
STSRow
*
)
SL_GET_NODE_DATA
(
node
))
<
pTsdbReadHandle
->
window
.
ekey
)
&&
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)))
{
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
...
...
@@ -2541,12 +2546,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
STSchema
*
pSchema
=
NULL
;
do
{
S
MemRow
row
=
getSMem
RowInTableMem
(
pCheckInfo
,
pTsdbReadHandle
->
order
,
pCfg
->
update
,
NULL
);
S
TSRow
*
row
=
getS
RowInTableMem
(
pCheckInfo
,
pTsdbReadHandle
->
order
,
pCfg
->
update
,
NULL
);
if
(
row
==
NULL
)
{
break
;
}
TSKEY
key
=
memRowKey
(
row
);
TSKEY
key
=
TD_ROW_KEY
(
row
);
if
((
key
>
maxKey
&&
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
||
(
key
<
maxKey
&&
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)))
{
tsdbDebug
(
"%p key:%"
PRIu64
" beyond qrange:%"
PRId64
" - %"
PRId64
", no more data in buffer"
,
pTsdbReadHandle
,
key
,
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
);
...
...
@@ -2559,9 +2564,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
win
->
ekey
=
key
;
if
(
rv
!=
memRowVersion
(
row
))
{
if
(
rv
!=
TD_ROW_SVER
(
row
))
{
pSchema
=
metaGetTbTSchema
(
pTsdbReadHandle
->
pTsdb
->
pMeta
,
pCheckInfo
->
tableId
,
0
);
rv
=
memRowVersion
(
row
);
rv
=
TD_ROW_SVER
(
row
);
}
mergeTwoRowFromMem
(
pTsdbReadHandle
,
maxRowsToRead
,
numOfRows
,
row
,
NULL
,
numOfCols
,
pCheckInfo
->
tableId
,
pSchema
,
NULL
,
true
);
...
...
@@ -2684,7 +2689,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
SQueryFilePos
*
cur
=
&
pTsdbReadHandle
->
cur
;
S
MemRow
pRow
=
NULL
;
S
TSRow
*
pRow
=
NULL
;
TSKEY
key
=
TSKEY_INITIAL_VAL
;
int32_t
step
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)
?
1
:-
1
;
...
...
@@ -3093,7 +3098,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
* if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
* else set pRes and return TSDB_CODE_SUCCESS and save lastKey
*/
//
int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow
* pRes, TSKEY* lastKey) {
//
int32_t tsdbGetCachedLastRow(STable* pTable, STSRow*
* pRes, TSKEY* lastKey) {
// int32_t code = TSDB_CODE_SUCCESS;
//
// TSDB_RLOCK_TABLE(pTable);
...
...
@@ -3110,7 +3115,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) {
// }
// }
//
//out:
//
out:
// TSDB_RUNLOCK_TABLE(pTable);
// return code;
//}
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
d71fece2
...
...
@@ -21,7 +21,8 @@ static void tsdbResetReadTable(SReadH *pReadh);
static
void
tsdbResetReadFile
(
SReadH
*
pReadh
);
static
int
tsdbLoadBlockDataImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
);
static
int
tsdbCheckAndDecodeColumnData
(
SDataCol
*
pDataCol
,
void
*
content
,
int32_t
len
,
int8_t
comp
,
int
numOfRows
,
int
maxPoints
,
char
*
buffer
,
int
bufferSize
);
int
numOfBitmaps
,
int
lenOfBitmaps
,
int
maxPoints
,
char
*
buffer
,
int
bufferSize
);
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
int
numOfColIds
);
static
int
tsdbLoadColData
(
SReadH
*
pReadh
,
SDFile
*
pDFile
,
SBlock
*
pBlock
,
SBlockCol
*
pBlockCol
,
SDataCol
*
pDataCol
);
...
...
@@ -463,6 +464,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
// Recover the data
int
ccol
=
0
;
// loop iter for SBlockCol object
int
dcol
=
0
;
// loop iter for SDataCols object
int
nBitmaps
=
(
int
)
TD_BITMAP_BYTES
(
pBlock
->
numOfRows
);
SBlockCol
*
pBlockCol
=
NULL
;
while
(
dcol
<
pDataCols
->
numOfCols
)
{
SDataCol
*
pDataCol
=
&
(
pDataCols
->
cols
[
dcol
]);
if
(
dcol
!=
0
&&
ccol
>=
pBlockData
->
numOfCols
)
{
...
...
@@ -477,12 +480,26 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
int32_t
tlen
=
pBlock
->
keyLen
;
if
(
dcol
!=
0
)
{
SBlockCol
*
pBlockCol
=
&
(
pBlockData
->
cols
[
ccol
]);
pBlockCol
=
&
(
pBlockData
->
cols
[
ccol
]);
tcolId
=
pBlockCol
->
colId
;
toffset
=
tsdbGetBlockColOffset
(
pBlockCol
);
tlen
=
pBlockCol
->
len
;
pDataCol
->
bitmap
=
pBlockCol
->
bitmap
;
}
else
{
ASSERT
(
pDataCol
->
colId
==
tcolId
);
pDataCol
->
bitmap
=
1
;
}
int32_t
tBitmaps
=
0
;
int32_t
tLenBitmap
=
0
;
if
((
dcol
!=
0
)
&&
(
pBlockCol
->
bitmap
==
0
))
{
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
tBitmaps
=
nBitmaps
;
tLenBitmap
=
tBitmaps
;
}
else
{
tBitmaps
=
(
int32_t
)
ceil
((
double
)
nBitmaps
/
TYPE_BYTES
[
pDataCol
->
type
]);
tLenBitmap
=
tBitmaps
*
TYPE_BYTES
[
pDataCol
->
type
];
}
}
if
(
tcolId
==
pDataCol
->
colId
)
{
...
...
@@ -492,7 +509,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
}
if
(
tsdbCheckAndDecodeColumnData
(
pDataCol
,
POINTER_SHIFT
(
pBlockData
,
tsize
+
toffset
),
tlen
,
pBlock
->
algorithm
,
pBlock
->
numOfRows
,
pDataCols
->
maxPoints
,
TSDB_READ_COMP_BUF
(
pReadh
),
pBlock
->
numOfRows
,
tBitmaps
,
tLenBitmap
,
pDataCols
->
maxPoints
,
TSDB_READ_COMP_BUF
(
pReadh
),
(
int
)
taosTSizeof
(
TSDB_READ_COMP_BUF
(
pReadh
)))
<
0
)
{
tsdbError
(
"vgId:%d file %s is broken at column %d block offset %"
PRId64
" column offset %u"
,
TSDB_READ_REPO_ID
(
pReadh
),
TSDB_FILE_FULL_NAME
(
pDFile
),
tcolId
,
(
int64_t
)
pBlock
->
offset
,
toffset
);
...
...
@@ -516,7 +533,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
}
static
int
tsdbCheckAndDecodeColumnData
(
SDataCol
*
pDataCol
,
void
*
content
,
int32_t
len
,
int8_t
comp
,
int
numOfRows
,
int
maxPoints
,
char
*
buffer
,
int
bufferSize
)
{
int
numOfBitmaps
,
int
lenOfBitmaps
,
int
maxPoints
,
char
*
buffer
,
int
bufferSize
)
{
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
content
,
len
))
{
terrno
=
TSDB_CODE_TDB_FILE_CORRUPTED
;
return
-
1
;
...
...
@@ -527,8 +545,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
// Decode the data
if
(
comp
)
{
// Need to decompress
int
tlen
=
(
*
(
tDataTypes
[
pDataCol
->
type
].
decompFunc
))(
content
,
len
-
sizeof
(
TSCKSUM
),
numOfRows
,
pDataCol
->
pData
,
pDataCol
->
spaceSize
,
comp
,
buffer
,
bufferSize
);
int
tlen
=
(
*
(
tDataTypes
[
pDataCol
->
type
].
decompFunc
))(
content
,
len
-
sizeof
(
TSCKSUM
),
numOfRows
+
numOfBitmaps
,
pDataCol
->
pData
,
pDataCol
->
spaceSize
,
comp
,
buffer
,
bufferSize
);
if
(
tlen
<=
0
)
{
tsdbError
(
"Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d"
,
len
,
comp
,
numOfRows
,
maxPoints
,
bufferSize
);
...
...
@@ -542,9 +561,22 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
memcpy
(
pDataCol
->
pData
,
content
,
pDataCol
->
len
);
}
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
if
(
lenOfBitmaps
>
0
)
{
pDataCol
->
len
-=
lenOfBitmaps
;
void
*
pSrcBitmap
=
NULL
;
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
pSrcBitmap
=
dataColSetOffset
(
pDataCol
,
numOfRows
);
}
else
{
pSrcBitmap
=
POINTER_SHIFT
(
pDataCol
->
pData
,
numOfRows
*
TYPE_BYTES
[
pDataCol
->
type
]);
}
void
*
pDestBitmap
=
POINTER_SHIFT
(
pDataCol
->
pData
,
pDataCol
->
bytes
*
maxPoints
);
// restore the bitmap parts
memcpy
(
pDestBitmap
,
pSrcBitmap
,
lenOfBitmaps
);
}
else
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
dataColSetOffset
(
pDataCol
,
numOfRows
);
}
return
0
;
}
...
...
@@ -590,6 +622,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
// load the key row
blockCol
.
colId
=
colId
;
blockCol
.
bitmap
=
0
;
// default is NORM for the primary key column
blockCol
.
len
=
pBlock
->
keyLen
;
blockCol
.
type
=
pDataCol
->
type
;
blockCol
.
offset
=
TSDB_KEY_COL_OFFSET
;
...
...
@@ -617,6 +650,8 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
}
ASSERT
(
pBlockCol
->
colId
==
pDataCol
->
colId
);
// set the bitmap
pDataCol
->
bitmap
=
pBlockCol
->
bitmap
;
}
if
(
tsdbLoadColData
(
pReadh
,
pDFile
,
pBlock
,
pBlockCol
,
pDataCol
)
<
0
)
return
-
1
;
...
...
@@ -630,7 +665,22 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
STsdb
*
pRepo
=
TSDB_READ_REPO
(
pReadh
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
int
tsize
=
pDataCol
->
bytes
*
pBlock
->
numOfRows
+
COMP_OVERFLOW_BYTES
;
int
nBitmaps
=
(
int
)
TD_BITMAP_BYTES
(
pBlock
->
numOfRows
);
int32_t
tBitmaps
=
0
;
int32_t
tLenBitmap
=
0
;
if
(
pBlockCol
->
bitmap
==
0
)
{
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
tBitmaps
=
nBitmaps
;
tLenBitmap
=
tBitmaps
;
}
else
{
tBitmaps
=
(
int32_t
)
ceil
((
double
)
nBitmaps
/
TYPE_BYTES
[
pDataCol
->
type
]);
tLenBitmap
=
tBitmaps
*
TYPE_BYTES
[
pDataCol
->
type
];
}
}
int
tsize
=
pDataCol
->
bytes
*
pBlock
->
numOfRows
+
tLenBitmap
+
COMP_OVERFLOW_BYTES
;
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_BUF
(
pReadh
)),
pBlockCol
->
len
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_READ_COMP_BUF
(
pReadh
)),
tsize
)
<
0
)
return
-
1
;
...
...
@@ -658,7 +708,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
}
if
(
tsdbCheckAndDecodeColumnData
(
pDataCol
,
pReadh
->
pBuf
,
pBlockCol
->
len
,
pBlock
->
algorithm
,
pBlock
->
numOfRows
,
pCfg
->
maxRowsPerFileBlock
,
pReadh
->
pCBuf
,
(
int32_t
)
taosTSizeof
(
pReadh
->
pCBuf
))
<
0
)
{
tBitmaps
,
tLenBitmap
,
pCfg
->
maxRowsPerFileBlock
,
pReadh
->
pCBuf
,
(
int32_t
)
taosTSizeof
(
pReadh
->
pCBuf
))
<
0
)
{
tsdbError
(
"vgId:%d file %s is broken at column %d offset %"
PRId64
,
REPO_ID
(
pRepo
),
TSDB_FILE_FULL_NAME
(
pDFile
),
pBlockCol
->
colId
,
offset
);
return
-
1
;
...
...
source/libs/parser/inc/dataBlockMgt.h
浏览文件 @
d71fece2
...
...
@@ -80,44 +80,37 @@ typedef struct STableDataBlocks {
STagData
tagData
;
SParsedDataColInfo
boundColumnInfo
;
S
MemRowBuilder
rowBuilder
;
S
RowBuilder
rowBuilder
;
}
STableDataBlocks
;
static
FORCE_INLINE
void
initSMemRow
(
SMemRow
row
,
uint8_t
memRowType
,
STableDataBlocks
*
pBlock
,
int16_t
nBoundCols
)
{
memRowSetType
(
row
,
memRowType
);
if
(
isDataRowT
(
memRowType
))
{
dataRowSetVersion
(
memRowDataBody
(
row
),
pBlock
->
pTableMeta
->
sversion
);
dataRowSetLen
(
memRowDataBody
(
row
),
(
TDRowLenT
)(
TD_DATA_ROW_HEAD_SIZE
+
pBlock
->
boundColumnInfo
.
flen
));
}
else
{
ASSERT
(
nBoundCols
>
0
);
memRowSetKvVersion
(
row
,
pBlock
->
pTableMeta
->
sversion
);
kvRowSetNCols
(
memRowKvBody
(
row
),
nBoundCols
);
kvRowSetLen
(
memRowKvBody
(
row
),
(
TDRowLenT
)(
TD_KV_ROW_HEAD_SIZE
+
sizeof
(
SColIdx
)
*
nBoundCols
));
}
}
static
FORCE_INLINE
int32_t
getExtendedRowSize
(
STableDataBlocks
*
pBlock
)
{
ASSERT
(
pBlock
->
rowSize
==
pBlock
->
pTableMeta
->
tableInfo
.
rowSize
);
return
pBlock
->
rowSize
+
TD_MEM_ROW_DATA_HEAD_SIZE
+
pBlock
->
boundColumnInfo
.
extendedVarLen
;
STableComInfo
*
pTableInfo
=
&
pBlock
->
pTableMeta
->
tableInfo
;
ASSERT
(
pBlock
->
rowSize
==
pTableInfo
->
rowSize
);
return
pBlock
->
rowSize
+
TD_ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
+
pBlock
->
boundColumnInfo
.
extendedVarLen
+
(
int32_t
)
TD_BITMAP_BYTES
(
pTableInfo
->
numOfColumns
-
1
);
}
static
FORCE_INLINE
void
getMemRowAppendInfo
(
SSchema
*
pSchema
,
uint8_t
memR
owType
,
SParsedDataColInfo
*
spd
,
int32_t
idx
,
int32_t
*
toffset
)
{
static
FORCE_INLINE
void
getMemRowAppendInfo
(
SSchema
*
pSchema
,
uint8_t
r
owType
,
SParsedDataColInfo
*
spd
,
int32_t
idx
,
int32_t
*
toffset
,
int32_t
*
colIdx
)
{
int32_t
schemaIdx
=
0
;
if
(
IS_DATA_COL_ORDERED
(
spd
))
{
schemaIdx
=
spd
->
boundedColumns
[
idx
]
-
1
;
if
(
isDataRowT
(
memR
owType
))
{
if
(
TD_IS_TP_ROW_T
(
r
owType
))
{
*
toffset
=
(
spd
->
cols
+
schemaIdx
)
->
toffset
;
// the offset of firstPart
*
colIdx
=
schemaIdx
;
}
else
{
*
toffset
=
idx
*
sizeof
(
SColIdx
);
// the offset of SColIdx
*
colIdx
=
idx
;
}
}
else
{
ASSERT
(
idx
==
(
spd
->
colIdxInfo
+
idx
)
->
boundIdx
);
schemaIdx
=
(
spd
->
colIdxInfo
+
idx
)
->
schemaColIdx
;
if
(
isDataRowT
(
memR
owType
))
{
if
(
TD_IS_TP_ROW_T
(
r
owType
))
{
*
toffset
=
(
spd
->
cols
+
schemaIdx
)
->
toffset
;
*
colIdx
=
schemaIdx
;
}
else
{
*
toffset
=
((
spd
->
colIdxInfo
+
idx
)
->
finalIdx
)
*
sizeof
(
SColIdx
);
*
colIdx
=
(
spd
->
colIdxInfo
+
idx
)
->
finalIdx
;
}
}
}
...
...
@@ -141,7 +134,7 @@ void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t
void
destroyBoundColumnInfo
(
SParsedDataColInfo
*
pColList
);
void
destroyBlockArrayList
(
SArray
*
pDataBlockList
);
void
destroyBlockHashmap
(
SHashObj
*
pDataBlockHash
);
int
init
MemRowBuilder
(
SMemRowBuilder
*
pBuilder
,
uint32_t
nRows
,
SParsedDataColInfo
*
pColInfo
);
int
init
RowBuilder
(
SRowBuilder
*
pBuilder
,
int16_t
schemaVer
,
SParsedDataColInfo
*
pColInfo
);
int32_t
allocateMemIfNeed
(
STableDataBlocks
*
pDataBlock
,
int32_t
rowSize
,
int32_t
*
numOfRows
);
int32_t
getDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
);
...
...
source/libs/parser/src/dataBlockMgt.c
浏览文件 @
d71fece2
...
...
@@ -174,81 +174,18 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
}
static
int32_t
getRowExpandSize
(
STableMeta
*
pTableMeta
)
{
int32_t
result
=
TD_
MEM_ROW_DATA_HEAD_SIZE
;
int32_t
result
=
TD_
ROW_HEAD_LEN
-
sizeof
(
TSKEY
)
;
int32_t
columns
=
getNumOfColumns
(
pTableMeta
);
SSchema
*
pSchema
=
getTableColumnSchema
(
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
columns
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
columns
;
++
i
)
{
if
(
IS_VAR_DATA_TYPE
((
pSchema
+
i
)
->
type
))
{
result
+=
TYPE_BYTES
[
TSDB_DATA_TYPE_BINARY
];
}
}
result
+=
(
int32_t
)
TD_BITMAP_BYTES
(
columns
-
1
);
return
result
;
}
/**
* TODO: Move to tdataformat.h and refactor when STSchema available.
* - fetch flen and toffset from STSChema and remove param spd
*/
static
FORCE_INLINE
void
convertToSDataRow
(
SMemRow
dest
,
SMemRow
src
,
SSchema
*
pSchema
,
int
nCols
,
SParsedDataColInfo
*
spd
)
{
ASSERT
(
isKvRow
(
src
));
SKVRow
kvRow
=
memRowKvBody
(
src
);
SDataRow
dataRow
=
memRowDataBody
(
dest
);
memRowSetType
(
dest
,
SMEM_ROW_DATA
);
dataRowSetVersion
(
dataRow
,
memRowKvVersion
(
src
));
dataRowSetLen
(
dataRow
,
(
TDRowLenT
)(
TD_DATA_ROW_HEAD_SIZE
+
spd
->
flen
));
int32_t
kvIdx
=
0
;
for
(
int
i
=
0
;
i
<
nCols
;
++
i
)
{
SSchema
*
schema
=
pSchema
+
i
;
void
*
val
=
tdGetKVRowValOfColEx
(
kvRow
,
schema
->
colId
,
&
kvIdx
);
tdAppendDataColVal
(
dataRow
,
val
!=
NULL
?
val
:
getNullValue
(
schema
->
type
),
true
,
schema
->
type
,
(
spd
->
cols
+
i
)
->
toffset
);
}
}
// TODO: Move to tdataformat.h and refactor when STSchema available.
static
FORCE_INLINE
void
convertToSKVRow
(
SMemRow
dest
,
SMemRow
src
,
SSchema
*
pSchema
,
int
nCols
,
int
nBoundCols
,
SParsedDataColInfo
*
spd
)
{
ASSERT
(
isDataRow
(
src
));
SDataRow
dataRow
=
memRowDataBody
(
src
);
SKVRow
kvRow
=
memRowKvBody
(
dest
);
memRowSetType
(
dest
,
SMEM_ROW_KV
);
memRowSetKvVersion
(
kvRow
,
dataRowVersion
(
dataRow
));
kvRowSetNCols
(
kvRow
,
nBoundCols
);
kvRowSetLen
(
kvRow
,
(
TDRowLenT
)(
TD_KV_ROW_HEAD_SIZE
+
sizeof
(
SColIdx
)
*
nBoundCols
));
int32_t
toffset
=
0
,
kvOffset
=
0
;
for
(
int
i
=
0
;
i
<
nCols
;
++
i
)
{
if
((
spd
->
cols
+
i
)
->
valStat
==
VAL_STAT_HAS
)
{
SSchema
*
schema
=
pSchema
+
i
;
toffset
=
(
spd
->
cols
+
i
)
->
toffset
;
void
*
val
=
tdGetRowDataOfCol
(
dataRow
,
schema
->
type
,
toffset
+
TD_DATA_ROW_HEAD_SIZE
);
tdAppendKvColVal
(
kvRow
,
val
,
true
,
schema
->
colId
,
schema
->
type
,
kvOffset
);
kvOffset
+=
sizeof
(
SColIdx
);
}
}
}
// TODO: Move to tdataformat.h and refactor when STSchema available.
static
FORCE_INLINE
void
convertSMemRow
(
SMemRow
dest
,
SMemRow
src
,
STableDataBlocks
*
pBlock
)
{
STableMeta
*
pTableMeta
=
pBlock
->
pTableMeta
;
STableComInfo
tinfo
=
getTableInfo
(
pTableMeta
);
SSchema
*
pSchema
=
getTableColumnSchema
(
pTableMeta
);
SParsedDataColInfo
*
spd
=
&
pBlock
->
boundColumnInfo
;
ASSERT
(
dest
!=
src
);
if
(
isDataRow
(
src
))
{
// TODO: Can we use pBlock -> numOfParam directly?
ASSERT
(
spd
->
numOfBound
>
0
);
convertToSKVRow
(
dest
,
src
,
pSchema
,
tinfo
.
numOfColumns
,
spd
->
numOfBound
,
spd
);
}
else
{
convertToSDataRow
(
dest
,
src
,
pSchema
,
tinfo
.
numOfColumns
,
spd
);
}
}
static
void
destroyDataBlock
(
STableDataBlocks
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
)
{
return
;
...
...
@@ -361,7 +298,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
char
*
pBlockData
=
pBlocks
->
data
;
int
n
=
0
;
while
(
n
<
nRows
)
{
pBlkKeyTuple
->
skey
=
memRowKey
(
pBlockData
);
pBlkKeyTuple
->
skey
=
TD_ROW_KEY
((
STSRow
*
)
pBlockData
);
pBlkKeyTuple
->
payloadAddr
=
pBlockData
;
// next loop
...
...
@@ -446,27 +383,29 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
int32_t
numOfRows
=
pBlock
->
numOfRows
;
if
(
isRawPayload
)
{
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
SMemRow
memRow
=
(
SMemRow
)
pDataBlock
;
memRowSetType
(
memRow
,
SMEM_ROW_DATA
);
SDataRow
trow
=
memRowDataBody
(
memRow
);
dataRowSetLen
(
trow
,
(
uint16_t
)(
TD_DATA_ROW_HEAD_SIZE
+
flen
));
dataRowSetVersion
(
trow
,
pTableMeta
->
sversion
);
SRowBuilder
builder
=
{
0
};
tdSRowInit
(
&
builder
,
pTableMeta
->
sversion
);
tdSRowSetInfo
(
&
builder
,
getNumOfColumns
(
pTableMeta
),
-
1
,
flen
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
tdSRowResetBuf
(
&
builder
,
pDataBlock
);
int
toffset
=
0
;
for
(
int32_t
j
=
0
;
j
<
tinfo
.
numOfColumns
;
j
++
)
{
tdAppendColVal
(
trow
,
p
,
pSchema
[
j
].
type
,
toffset
);
toffset
+=
TYPE_BYTES
[
pSchema
[
j
].
type
];
for
(
int32_t
j
=
0
;
j
<
tinfo
.
numOfColumns
;
++
j
)
{
int8_t
colType
=
pSchema
[
j
].
type
;
uint8_t
valType
=
isNull
(
p
,
colType
)
?
TD_VTYPE_NULL
:
TD_VTYPE_NORM
;
tdAppendColValToRow
(
&
builder
,
pSchema
[
j
].
colId
,
colType
,
valType
,
p
,
true
,
toffset
,
j
);
toffset
+=
TYPE_BYTES
[
colType
];
p
+=
pSchema
[
j
].
bytes
;
}
pDataBlock
=
(
char
*
)
pDataBlock
+
memRowTLen
(
memRow
)
;
pBlock
->
dataLen
+=
memRowTLen
(
memRow
)
;
int32_t
rowLen
=
TD_ROW_LEN
((
STSRow
*
)
pDataBlock
);
pDataBlock
=
(
char
*
)
pDataBlock
+
rowLen
;
pBlock
->
dataLen
+=
rowLen
;
}
}
else
{
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
char
*
payload
=
(
blkKeyTuple
+
i
)
->
payloadAddr
;
TDRowLenT
rowTLen
=
memRowTLen
(
payload
);
TDRowLenT
rowTLen
=
TD_ROW_LEN
((
STSRow
*
)
payload
);
memcpy
(
pDataBlock
,
payload
,
rowTLen
);
pDataBlock
=
POINTER_SHIFT
(
pDataBlock
,
rowTLen
);
pBlock
->
dataLen
+=
rowTLen
;
...
...
@@ -587,16 +526,10 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int
initMemRowBuilder
(
SMemRowBuilder
*
pBuilder
,
uint32_t
nRows
,
SParsedDataColInfo
*
pColInfo
)
{
ASSERT
(
nRows
>=
0
&&
pColInfo
->
numOfCols
>
0
&&
(
pColInfo
->
numOfBound
<=
pColInfo
->
numOfCols
));
uint32_t
dataLen
=
TD_MEM_ROW_DATA_HEAD_SIZE
+
pColInfo
->
allNullLen
;
uint32_t
kvLen
=
TD_MEM_ROW_KV_HEAD_SIZE
+
pColInfo
->
numOfBound
*
sizeof
(
SColIdx
)
+
pColInfo
->
boundNullLen
;
if
(
isUtilizeKVRow
(
kvLen
,
dataLen
))
{
pBuilder
->
memRowType
=
SMEM_ROW_KV
;
}
else
{
pBuilder
->
memRowType
=
SMEM_ROW_DATA
;
}
int
initRowBuilder
(
SRowBuilder
*
pBuilder
,
int16_t
schemaVer
,
SParsedDataColInfo
*
pColInfo
)
{
ASSERT
(
pColInfo
->
numOfCols
>
0
&&
(
pColInfo
->
numOfBound
<=
pColInfo
->
numOfCols
));
tdSRowInit
(
pBuilder
,
schemaVer
);
tdSRowSetExtendedInfo
(
pBuilder
,
pColInfo
->
numOfCols
,
pColInfo
->
numOfBound
,
pColInfo
->
flen
,
pColInfo
->
allNullLen
,
pColInfo
->
boundNullLen
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/parser/src/insertParser.c
浏览文件 @
d71fece2
...
...
@@ -259,28 +259,30 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
}
typedef
struct
SMemParam
{
S
MemRow
row
;
S
RowBuilder
*
rb
;
SSchema
*
schema
;
int32_t
toffset
;
int32_t
colIdx
;
}
SMemParam
;
static
FORCE_INLINE
int32_t
MemRowAppend
(
const
void
*
value
,
int32_t
len
,
void
*
param
)
{
SMemParam
*
pa
=
(
SMemParam
*
)
param
;
static
FORCE_INLINE
int32_t
MemRowAppend
(
const
void
*
value
,
int32_t
len
,
void
*
param
)
{
SMemParam
*
pa
=
(
SMemParam
*
)
param
;
SRowBuilder
*
rb
=
pa
->
rb
;
if
(
TSDB_DATA_TYPE_BINARY
==
pa
->
schema
->
type
)
{
c
har
*
rowEnd
=
memRowEnd
(
pa
->
row
);
c
onst
char
*
rowEnd
=
tdRowEnd
(
rb
->
pBuf
);
STR_WITH_SIZE_TO_VARSTR
(
rowEnd
,
value
,
len
);
tdAppend
MemRowColVal
(
pa
->
row
,
rowEnd
,
true
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
pa
->
toffset
);
tdAppend
ColValToRow
(
rb
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
TD_VTYPE_NORM
,
rowEnd
,
false
,
pa
->
toffset
,
pa
->
colIdx
);
}
else
if
(
TSDB_DATA_TYPE_NCHAR
==
pa
->
schema
->
type
)
{
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t
output
=
0
;
c
har
*
rowEnd
=
memRowEnd
(
pa
->
row
);
if
(
!
taosMbsToUcs4
(
value
,
len
,
(
char
*
)
varDataVal
(
rowEnd
),
pa
->
schema
->
bytes
-
VARSTR_HEADER_SIZE
,
&
output
))
{
int32_t
output
=
0
;
c
onst
char
*
rowEnd
=
tdRowEnd
(
rb
->
pBuf
);
if
(
!
taosMbsToUcs4
(
value
,
len
,
(
char
*
)
varDataVal
(
rowEnd
),
pa
->
schema
->
bytes
-
VARSTR_HEADER_SIZE
,
&
output
))
{
return
TSDB_CODE_TSC_SQL_SYNTAX_ERROR
;
}
varDataSetLen
(
rowEnd
,
output
);
tdAppend
MemRowColVal
(
pa
->
row
,
rowEnd
,
false
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
pa
->
toffset
);
tdAppend
ColValToRow
(
rb
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
TD_VTYPE_NORM
,
rowEnd
,
false
,
pa
->
toffset
,
pa
->
colIdx
);
}
else
{
tdAppend
MemRowColVal
(
pa
->
row
,
value
,
true
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
pa
->
toffset
);
tdAppend
ColValToRow
(
rb
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
TD_VTYPE_NORM
,
value
,
true
,
pa
->
toffset
,
pa
->
colIdx
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -408,26 +410,27 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
return
TSDB_CODE_SUCCESS
;
}
static
int
parseOneRow
(
SInsertParseContext
*
pCxt
,
STableDataBlocks
*
pDataBlocks
,
int16_t
timePrec
,
int32_t
*
len
,
char
*
tmpTokenBuf
)
{
static
int
parseOneRow
(
SInsertParseContext
*
pCxt
,
STableDataBlocks
*
pDataBlocks
,
int16_t
timePrec
,
int32_t
*
len
,
char
*
tmpTokenBuf
)
{
SParsedDataColInfo
*
spd
=
&
pDataBlocks
->
boundColumnInfo
;
SMemRowBuilder
*
pBuilder
=
&
pDataBlocks
->
rowBuilder
;
char
*
row
=
pDataBlocks
->
pData
+
pDataBlocks
->
size
;
// skip the SSubmitBlk header
initSMemRow
(
row
,
pBuilder
->
memRowType
,
pDataBlocks
,
spd
->
numOfBound
);
SRowBuilder
*
pBuilder
=
&
pDataBlocks
->
rowBuilder
;
STSRow
*
row
=
(
STSRow
*
)(
pDataBlocks
->
pData
+
pDataBlocks
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
bool
isParseBindParam
=
false
;
SSchema
*
schema
=
getTableColumnSchema
(
pDataBlocks
->
pTableMeta
);
SMemParam
param
=
{.
r
ow
=
row
};
SMemParam
param
=
{.
r
b
=
pBuilder
};
SToken
sToken
=
{
0
};
// 1. set the parsed value from sql string
for
(
int
i
=
0
;
i
<
spd
->
numOfBound
;
++
i
)
{
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
SSchema
*
pSchema
=
&
schema
[
spd
->
boundedColumns
[
i
]
-
1
];
param
.
schema
=
pSchema
;
getMemRowAppendInfo
(
schema
,
pBuilder
->
memRowType
,
spd
,
i
,
&
param
.
toffset
);
getMemRowAppendInfo
(
schema
,
pBuilder
->
rowType
,
spd
,
i
,
&
param
.
toffset
,
&
param
.
colIdx
);
CHECK_CODE
(
parseValueToken
(
&
pCxt
->
pSql
,
&
sToken
,
pSchema
,
timePrec
,
tmpTokenBuf
,
MemRowAppend
,
&
param
,
&
pCxt
->
msg
));
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pSchema
->
colId
)
{
TSKEY
tsKey
=
memRowKey
(
row
);
TSKEY
tsKey
=
TD_ROW_KEY
(
row
);
if
(
checkTimestamp
(
pDataBlocks
,
(
const
char
*
)
&
tsKey
)
!=
TSDB_CODE_SUCCESS
)
{
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"client time/server time can not be mixed up"
,
sToken
.
z
);
return
TSDB_CODE_TSC_INVALID_TIME_STAMP
;
...
...
@@ -437,17 +440,17 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
if
(
!
isParseBindParam
)
{
// set the null value for the columns that do not assign values
if
((
spd
->
numOfBound
<
spd
->
numOfCols
)
&&
isDataRow
(
row
))
{
SDataRow
dataRow
=
memRowDataBody
(
row
);
if
((
spd
->
numOfBound
<
spd
->
numOfCols
)
&&
TD_IS_TP_ROW
(
row
))
{
for
(
int32_t
i
=
0
;
i
<
spd
->
numOfCols
;
++
i
)
{
if
(
spd
->
cols
[
i
].
valStat
==
VAL_STAT_NONE
)
{
tdAppendDataColVal
(
dataRow
,
getNullValue
(
schema
[
i
].
type
),
true
,
schema
[
i
].
type
,
spd
->
cols
[
i
].
toffset
);
if
(
spd
->
cols
[
i
].
valStat
==
VAL_STAT_NONE
)
{
// the primary TS key is not VAL_STAT_NONE
tdAppendColValToTpRow
(
pBuilder
,
TD_VTYPE_NONE
,
getNullValue
(
schema
[
i
].
type
),
true
,
schema
[
i
].
type
,
i
,
spd
->
cols
[
i
].
toffset
);
}
}
}
}
*
len
=
pBuilder
->
r
owSize
;
*
len
=
pBuilder
->
extendedR
owSize
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -455,7 +458,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
static
int32_t
parseValues
(
SInsertParseContext
*
pCxt
,
STableDataBlocks
*
pDataBlock
,
int
maxRows
,
int32_t
*
numOfRows
)
{
STableComInfo
tinfo
=
getTableInfo
(
pDataBlock
->
pTableMeta
);
int32_t
extendedRowSize
=
getExtendedRowSize
(
pDataBlock
);
CHECK_CODE
(
init
MemRowBuilder
(
&
pDataBlock
->
rowBuilder
,
0
,
&
pDataBlock
->
boundColumnInfo
));
CHECK_CODE
(
init
RowBuilder
(
&
pDataBlock
->
rowBuilder
,
pDataBlock
->
pTableMeta
->
sversion
,
&
pDataBlock
->
boundColumnInfo
));
(
*
numOfRows
)
=
0
;
char
tmpTokenBuf
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
// used for deleting Escape character: \\, \', \"
...
...
source/libs/parser/src/parserUtil.c
浏览文件 @
d71fece2
...
...
@@ -518,7 +518,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int
//
// if (IS_RAW_PAYLOAD(insertParam->payloadType)) {
// for (int32_t i = 0; i < numOfRows; ++i) {
// S
MemRow memRow = (SMemRow
)pDataBlock;
// S
TSRow* memRow = (STSRow*
)pDataBlock;
// memRowSetType(memRow, SMEM_ROW_DATA);
// SDataRow trow = memRowDataBody(memRow);
// dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen));
...
...
@@ -531,13 +531,13 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int
// p += pSchema[j].bytes;
// }
//
// pDataBlock = (char*)pDataBlock +
memRowTLen
(memRow);
// pBlock->dataLen +=
memRowTLen
(memRow);
// pDataBlock = (char*)pDataBlock +
TD_ROW_LEN
(memRow);
// pBlock->dataLen +=
TD_ROW_LEN
(memRow);
// }
// } else {
// for (int32_t i = 0; i < numOfRows; ++i) {
// char* payload = (blkKeyTuple + i)->payloadAddr;
// TDRowLenT rowTLen =
memRowTLen
(payload);
// TDRowLenT rowTLen =
TD_ROW_LEN
(payload);
// memcpy(pDataBlock, payload, rowTLen);
// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
// pBlock->dataLen += rowTLen;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录