Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cea242bf
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cea242bf
编写于
1月 24, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add colValToSRow
上级
e5885caf
变更
6
显示空白变更内容
内联
并排
Showing
6 changed file
with
1293 addition
and
140 deletion
+1293
-140
include/common/tdataformat.h
include/common/tdataformat.h
+32
-73
include/common/trow.h
include/common/trow.h
+382
-65
include/util/types.h
include/util/types.h
+1
-1
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+4
-0
source/common/src/trow.c
source/common/src/trow.c
+870
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+4
-1
未找到文件。
include/common/tdataformat.h
浏览文件 @
cea242bf
...
...
@@ -24,6 +24,9 @@
extern
"C"
{
#endif
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
#define STR_TO_VARSTR(x, str) \
do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \
...
...
@@ -118,6 +121,10 @@ typedef struct {
STColumn
*
columns
;
}
STSchemaBuilder
;
#define TD_VTYPE_BITS 2 // val type
#define TD_VTYPE_PARTS 4 // 8 bits / TD_VTYPE_BITS = 4
#define TD_VTYPE_OPTR 3 // TD_VTYPE_PARTS - 1, utilize to get remainder
int
tdInitTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
int32_t
version
);
void
tdDestroyTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
);
void
tdResetTSchemaBuilder
(
STSchemaBuilder
*
pBuilder
,
int32_t
version
);
...
...
@@ -125,6 +132,8 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId
STSchema
*
tdGetSchemaFromBuilder
(
STSchemaBuilder
*
pBuilder
);
// ----------------- Semantic timestamp key definition
#ifdef TD_2 .0
typedef
uint64_t
TKEY
;
#define TKEY_INVALID UINT64_MAX
...
...
@@ -144,6 +153,29 @@ typedef uint64_t TKEY;
#define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key))
#else
typedef
uint64_t
TKEY
;
#define TKEY_INVALID UINT64_MAX
#define TKEY_NULL TKEY_INVALID
#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63)
#define TKEY_DELETE_FLAG (((TKEY)1) << 62)
#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG))
#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0)
#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0)
#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG)
#define tdGetTKEY(key) (((TKEY)ABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key)))
#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1))
#define MIN_TS_KEY ((TSKEY)0x8000000000000001)
#define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff)
#define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key))
#endif
static
FORCE_INLINE
TKEY
keyToTkey
(
TSKEY
key
)
{
TSKEY
lkey
=
key
;
if
(
key
>
MAX_TS_KEY
)
{
...
...
@@ -718,31 +750,6 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value
return
len
;
}
/**
* 1. calculate the delta of AllNullLen for SDataRow.
* 2. calculate the real len for SKVRow.
*/
static
FORCE_INLINE
void
tdGetColAppendDeltaLen
(
const
void
*
value
,
int8_t
colType
,
int32_t
*
dataLen
,
int32_t
*
kvLen
)
{
switch
(
colType
)
{
case
TSDB_DATA_TYPE_BINARY
:
{
int32_t
varLen
=
varDataLen
(
value
);
*
dataLen
+=
(
varLen
-
CHAR_BYTES
);
*
kvLen
+=
(
varLen
+
sizeof
(
SColIdx
));
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
{
int32_t
varLen
=
varDataLen
(
value
);
*
dataLen
+=
(
varLen
-
TSDB_NCHAR_SIZE
);
*
kvLen
+=
(
varLen
+
sizeof
(
SColIdx
));
break
;
}
default:
{
*
kvLen
+=
(
TYPE_BYTES
[
colType
]
+
sizeof
(
SColIdx
));
break
;
}
}
}
typedef
struct
{
int16_t
colId
;
uint8_t
colType
;
...
...
@@ -757,54 +764,6 @@ static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t c
SMemRow
mergeTwoMemRows
(
void
*
buffer
,
SMemRow
row1
,
SMemRow
row2
,
STSchema
*
pSchema1
,
STSchema
*
pSchema2
);
#if 0
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
* +-----------+----------+----------+--------------------------------------|--------------------|
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
* +-----------+----------+----------+--------------------------------------+--------------------|
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
* 2. dataTLen: total length including the header and body.
*/
#define PAYLOAD_NCOLS_LEN sizeof(uint16_t)
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT))
#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN)
#define PAYLOAD_ID_LEN sizeof(int16_t)
#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t))
#define PAYLOAD_COL_HEAD_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(uint16_t))
#define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY))
#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN)
#define payloadType(r) (*(uint8_t *)(r))
#define payloadSetType(r, t) (payloadType(r) = (t))
#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
#define payloadSetTLen(r, l) (payloadTLen(r) = (l))
#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET))
#define payloadSetNCols(r, n) (payloadNCols(r) = (n))
#define payloadValuesOffset(r) \
(PAYLOAD_HEADER_LEN + payloadNCols(r) * PAYLOAD_COL_HEAD_LEN) // avoid using the macro in loop
#define payloadValues(r) POINTER_SHIFT(r, payloadValuesOffset(r)) // avoid using the macro in loop
#define payloadColId(c) (*(int16_t *)(c))
#define payloadColType(c) (*(uint8_t *)POINTER_SHIFT(c, PAYLOAD_ID_LEN))
#define payloadColOffset(c) (*(uint16_t *)POINTER_SHIFT(c, PAYLOAD_ID_TYPE_LEN))
#define payloadColValue(c) POINTER_SHIFT(c, payloadColOffset(c))
#define payloadColSetId(c, i) (payloadColId(c) = (i))
#define payloadColSetType(c, t) (payloadColType(c) = (t))
#define payloadColSetOffset(c, o) (payloadColOffset(c) = (o))
#define payloadTSKey(r) (*(TSKEY *)POINTER_SHIFT(r, payloadValuesOffset(r)))
#define payloadTKey(r) (*(TKEY *)POINTER_SHIFT(r, payloadValuesOffset(r)))
#define payloadKey(r) tdGetKey(payloadTKey(r))
static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); }
#endif
#ifdef __cplusplus
}
#endif
...
...
include/common/trow.h
浏览文件 @
cea242bf
...
...
@@ -17,101 +17,64 @@
#define _TD_COMMON_ROW_H_
#include "os.h"
#include "tdef.h"
#include "taoserror.h"
#include "talgo.h"
#include "tbuffer.h"
#include "tdataformat.h"
#include "tdef.h"
#include "tschema.h"
#include "ttypes.h"
#ifdef __cplusplus
extern
"C"
{
#endif
#define TD_SUPPORT_NONE_VAL
// Target of trow.h:
// 1. Row related definition in dataformat.h of 2.0 could be replaced with trow.h of 3.0.
// 2. The basic definition in dataformat.h is shared with trow.h of 3.0.
// row type
#define TD_ROW_TP 0 // default
#define TD_ROW_KV 1
#define TD_ROW_TP 0
x0
// default
#define TD_ROW_KV
0x0
1
// val type
#define TD_VTYPE_NORM 0x0 // normal val: not none, not null
#define TD_VTYPE_NONE 0x01 // none or unknown/undefined
#define TD_VTYPE_NULL 0x02 // null val
#ifdef TD_SUPPORT_NONE_VAL
#define isSelectKVRow(klen, tlen) ((klen) < (tlen))
#ifdef TD_SUPPORT_BITMAP
static
FORCE_INLINE
bool
tdValTypeIsNorm
(
int8_t
valType
)
{
return
(
valType
&
TD_VTYPE_NORM
);
}
static
FORCE_INLINE
bool
tdValTypeIsNone
(
int8_t
valType
)
{
return
(
valType
&
TD_VTYPE_NONE
);
}
static
FORCE_INLINE
bool
tdValTypeIsNull
(
int8_t
valType
)
{
return
(
valType
&
TD_VTYPE_NULL
);
}
#else
#endif
static
FORCE_INLINE
bool
tdValIsNorm
(
int8_t
valType
,
const
void
*
val
,
int32_t
t
ype
)
{
#ifdef TD_SUPPORT_
NONE_VAL
static
FORCE_INLINE
bool
tdValIsNorm
(
int8_t
valType
,
const
void
*
val
,
int32_t
colT
ype
)
{
#ifdef TD_SUPPORT_
BITMAP
return
tdValTypeIsNorm
(
valType
);
#else
return
!
isNull
(
val
,
t
ype
);
return
!
isNull
(
val
,
colT
ype
);
#endif
}
static
FORCE_INLINE
bool
tdValIsNone
(
int8_t
valType
)
{
#ifdef TD_SUPPORT_
NONE_VAL
#ifdef TD_SUPPORT_
BITMAP
return
tdValTypeIsNone
(
valType
);
#else
return
false
;
#endif
}
static
FORCE_INLINE
bool
tdValIsNull
(
int8_t
valType
,
const
void
*
val
,
int32_t
type
)
{
#ifdef TD_SUPPORT_NONE_VAL
static
FORCE_INLINE
bool
tdValIsNull
(
int8_t
valType
,
const
void
*
val
,
int32_t
colType
)
{
#ifdef TD_SUPPORT_BITMAP
return
tdValTypeIsNull
(
valType
);
#else
return
isNull
(
val
,
t
ype
);
return
isNull
(
val
,
colT
ype
);
#endif
}
#define TD_ROW_LEN(r)
#define TD_ROW_TLEN(r)
#define TD_ROW_TYPE(r)
#define TD_ROW_BODY(r)
#define TD_ROW_TKEY(r)
#define TD_ROW_KEY(r)
#define TD_ROW_DELETED(r)
#define TD_ROW_VERSION(r)
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s)
#define TD_ROW_SET_LEN(r, l)
#define TD_ROW_SET_VERSION(r, v)
#define TD_ROW_CPY(dst, r)
// ----------------- SRow appended with tuple row structure(STpRow)
/*
* |---------|------------------------------------------------- len -------------------------------------->|
* |<-------- Head ------>|<--------- flen ------------->|<-- blen -->| |
* |---------+---------------------+---------------------------------+-------------+-----------------------+
* | uint8_t | uint32_t | int16_t | | | |
* |---------+----------+----------+---------------------------------+-------------------------------------+
* | flag | len | sversion |(key) first part | bitmap | second part |
* +---------+----------+----------+---------------------------------+-------------------------------------+
*
* NOTE: Timestamp in this row structure is TKEY instead of TSKEY
* Use 2 bits in bitmap for each column
* flag:
* 0: flag&0x01 0 STpRow, 1 SKvRow // since 2.0
* 1: flag&0x02 0 without bitmap, 1 with bitmap. // 如果None值支持数据库或者更小维度,则需要指定一个bit区分。TODO
* 2: endian(0 big endian, 1 little endian)
* 3-7: reserved(value 0)
*/
// ----------------- SRow appended with K-V row structure(SKvRow)
/* |--------------------|------------------------------------------------ len --------------------------->|
* |<-------- Head ---->|<--------- colsIdxLen ---------->|<-- blen -->| |
* |--------------------+----------+-----------------------------------------------------------------------+
* | uint8_t | uint32_t | int16_t | | | |
* |---------+----------+----------+-----------------------------------------------------------------------+
* | flag | len | ncols |(keyColId) cols index | bitmap | data part |
* |---------+----------+----------+---------------------------------+-------------------------------------+
*
* NOTE: Timestamp in this row structure is TKEY instead of TSKEY
*/
typedef
void
*
SRow
;
typedef
struct
{
...
...
@@ -140,32 +103,385 @@ typedef struct {
struct
{
/// row type
uint32_t
type
:
2
;
/// row schema version
uint32_t
sver
:
16
;
/// is delete row
/// is delete row(0 not delete, 1 delete)
uint32_t
del
:
1
;
/// endian(0 little endian, 1 big endian)
uint32_t
endian
:
1
;
/// reserved for back compatibility
uint32_t
reserve
:
13
;
uint32_t
reserve
:
12
;
/// row schema version
uint32_t
sver
:
16
;
};
};
/// row total length
uint32_t
len
;
/// nCols of SRow(only valid for K-V row)
uint64_t
ncols
:
16
;
/// row version
uint64_t
ver
;
uint64_t
ver
:
48
;
/// timestamp
TSKEY
ts
;
/// the inline data, maybe a tuple or a k-v tuple
char
data
[];
}
STSRow
;
#define TD_ROW_HEAD_LEN (sizeof(STSRow))
#define TD_ROW_TYPE(r) ((r)->type)
#define TD_ROW_DELETE(r) ((r)->del)
#define TD_ROW_ENDIAN(r) ((r)->endian)
#define TD_ROW_SVER(r) ((r)->sver)
#define TD_ROW_NCOLS(r) ((r)->ncols)
#define TD_ROW_DATA(r) ((r)->data)
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_TSKEY(r) ((r)->ts)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and (int)ceil((double)nCols/TD_VTYPE_PARTS)
// should be added if TD_SUPPORT_BITMAP defined.
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN)
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1)
#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v))
#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l))
#define TD_ROW_CPY(dst, r) memcpy((dst), (r), TD_ROW_LEN(r))
#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r))
#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP)
#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV)
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define TD_ROW_OFFSET(p) ((p)->toffset);
#ifdef TD_SUPPORT_BITMAP
static
FORCE_INLINE
void
*
tdRowBitmap
(
STSRow
*
pRow
,
uint16_t
flen
)
{
switch
(
pRow
->
type
)
{
case
TD_ROW_TP
:
return
POINTER_SHIFT
(
pRow
->
data
,
flen
);
case
TD_ROW_KV
:
return
POINTER_SHIFT
(
pRow
->
data
,
pRow
->
ncols
*
sizeof
(
SKvRowIdx
));
default:
break
;
}
return
NULL
;
}
#endif
STpRow
tdNewTpRowFromSchema
(
STSchema
*
pSchema
);
void
tdFreeTpRow
(
STpRow
row
);
void
tdInitTpRow
(
STpRow
row
,
STSchema
*
pSchema
);
STpRow
tdTpRowDup
(
STpRow
row
);
// ----------------- Tuple row structure(STpRow)
/*
* |<----------------------------- tlen ---------------------------------->|
* |<--------- flen ------------->|<-- blen -->| |
* +---------------------------------+-------------+-----------------------+
* | | | |
* +---------------------------------+-------------------------------------+
* | first part | bitmap | second part |
* +---------------------------------+-------------+-----------------------+
*
*/
// ----------------- K-V row structure(SKvRow)
/*
* |<--------- colsIdxLen ---------->|<-- blen -->| |
* +---------------------------------+------------+------------------------+
* | | | |
* +-----------------------------------------------------------------------+
* | cols index | bitmap | data part |
* +---------------------------------+------------+------------------------+
*
*/
typedef
struct
{
// necessary info
int8_t
rowType
;
int16_t
sver
;
STSRow
*
pBuf
;
// auxiliary info
int32_t
flen
;
int16_t
nBoundCols
;
int16_t
nCols
;
int16_t
nBitmaps
;
int16_t
nBoundBitmaps
;
int32_t
offset
;
void
*
pBitmap
;
void
*
pOffset
;
}
SRowBuilder
;
/**
* @brief
*
* @param pBuilder
* @param sversion schema version
* @return int32_t
*/
int32_t
tdSRowInit
(
SRowBuilder
*
pBuilder
,
int16_t
sversion
)
{
pBuilder
->
rowType
=
TD_ROW_TP
;
pBuilder
->
sver
=
sversion
;
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief 一般不建议使用,除非特殊情况
*
* @param pBuilder
* @param rowType
* @return FORCE_INLINE
*/
static
FORCE_INLINE
void
tdSRowSetRowType
(
SRowBuilder
*
pBuilder
,
int8_t
rowType
)
{
pBuilder
->
rowType
=
rowType
;
}
/**
* @brief 用于判定采用STpRow/SKvRow,
*
* @param pBuilder
* @param allNullLen 无法获取则填写-1
* @param boundNullLen 无法获取则填写-1
* @param nCols
* @param nBoundCols
* @param flen
* @return FORCE_INLINE
*/
static
FORCE_INLINE
int32_t
tdSRowSetExtendedInfo
(
SRowBuilder
*
pBuilder
,
int32_t
allNullLen
,
int32_t
boundNullLen
,
int32_t
nCols
,
int32_t
nBoundCols
,
int32_t
flen
)
{
if
((
boundNullLen
>
0
)
&&
(
allNullLen
>
0
)
&&
isSelectKVRow
(
boundNullLen
,
allNullLen
))
{
pBuilder
->
rowType
=
TD_ROW_KV
;
}
pBuilder
->
nBoundCols
=
nBoundCols
;
pBuilder
->
nCols
=
nCols
;
pBuilder
->
flen
=
flen
;
if
(
pBuilder
->
flen
<=
0
||
pBuilder
->
nCols
<=
0
||
pBuilder
->
nBoundCols
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
#ifdef TD_SUPPORT_BITMAP
pBuilder
->
nBitmaps
=
(
int16_t
)
ceil
((
double
)
pBuilder
->
nCols
/
TD_VTYPE_PARTS
);
pBuilder
->
nBoundBitmaps
=
(
int16_t
)
ceil
((
double
)
pBuilder
->
nBoundCols
/
TD_VTYPE_PARTS
);
#else
pBuilder
->
nBitmaps
=
0
;
pBuilder
->
nBoundBitmaps
=
0
;
#endif
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief 在pBuf位置生成SRow
*
* @param pBuilder
* @param pBuf
*/
int32_t
tdSRowResetBuf
(
SRowBuilder
*
pBuilder
,
void
*
pBuf
)
{
pBuilder
->
pBuf
=
pBuf
;
if
(
!
pBuilder
->
pBuf
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
switch
(
pBuilder
->
rowType
)
{
case
TD_ROW_TP
:
#ifdef TD_SUPPORT_BITMAP
pBuilder
->
pBitmap
=
POINTER_SHIFT
(
pBuilder
->
pBuf
,
pBuilder
->
flen
);
#endif
uint32_t
len
=
TD_ROW_HEAD_LEN
+
pBuilder
->
flen
+
pBuilder
->
nBitmaps
;
TD_ROW_SET_LEN
(
pBuilder
->
pBuf
,
len
);
TD_ROW_SET_SVER
(
pBuilder
->
pBuf
,
pBuilder
->
sver
);
break
;
case
TD_ROW_KV
:
uint32_t
len
=
pBuilder
->
nBoundCols
*
sizeof
(
SKvRowIdx
);
#ifdef TD_SUPPORT_BITMAP
pBuilder
->
pBitmap
=
POINTER_SHIFT
(
pBuilder
->
pBuf
,
len
);
#endif
len
+=
(
TD_ROW_HEAD_LEN
+
pBuilder
->
nBoundBitmaps
);
TD_ROW_SET_LEN
(
pBuilder
->
pBuf
,
len
);
TD_ROW_SET_SVER
(
pBuilder
->
pBuf
,
pBuilder
->
sver
);
break
;
default:
terrno
=
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief 由调用方管理存储空间的分配及释放,一次输入多个参数
*
* @param pBuilder
* @param pBuf
* @param allNullLen
* @param boundNullLen
* @param nCols
* @param nBoundCols
* @param flen
* @return FORCE_INLINE
*/
static
FORCE_INLINE
int32_t
tdSRowInitEx
(
SRowBuilder
*
pBuilder
,
void
*
pBuf
,
uint32_t
allNullLen
,
uint32_t
boundNullLen
,
int32_t
nCols
,
int32_t
nBoundCols
,
int32_t
flen
)
{
tdSRowSetExtendedInfo
(
pBuilder
,
allNullLen
,
boundNullLen
,
nCols
,
nBoundCols
,
flen
);
return
tdSRowResetBuf
(
pBuilder
,
pBuf
);
}
/**
* @brief
*
* @param pBuilder
*/
void
tdSRowReset
(
SRowBuilder
*
pBuilder
)
{
pBuilder
->
rowType
=
TD_ROW_TP
;
pBuilder
->
pBuf
=
NULL
;
pBuilder
->
nBoundCols
=
-
1
;
pBuilder
->
nCols
=
-
1
;
pBuilder
->
flen
=
-
1
;
pBuilder
->
pBitmap
=
NULL
;
}
/**
* @brief
*
* @param pBuilder
* @param colId start from PRIMARYKEY_TIMESTAMP_COL_ID
* @param colType
* @param val
* @param valType
* @param offset
* @param idx sorted column index, start from 0
* @return FORCE_INLINE
*/
static
FORCE_INLINE
int32_t
tdAppendColValToRow
(
SRowBuilder
*
pBuilder
,
int16_t
colId
,
int8_t
colType
,
const
void
*
val
,
int8_t
valType
,
int32_t
offset
,
int16_t
idx
)
{
STSRow
*
pRow
=
pBuilder
->
pBuf
;
void
*
pBitmap
=
NULL
;
if
(
!
val
)
{
#ifdef TD_SUPPORT_BITMAP
if
(
tdValIsNorm
(
valType
,
val
,
colType
))
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
terrno
;
}
#else
terrno
=
TSDB_CODE_INVALID_PTR
;
return
terrno
;
#endif
}
// TS KEY is stored in STSRow.ts and not included in STSRow.data field.
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
TD_ROW_TSKEY
(
pRow
)
=
*
(
TSKEY
*
)
val
;
#ifdef TD_SUPPORT_BITMAP
pBitmap
=
tdRowBitmap
(
pRow
,
pBuilder
->
flen
);
if
(
tdSetBitmap
(
pBitmap
,
idx
,
valType
)
!=
TSDB_CODE_SUCCESS
)
{
return
terrno
;
}
#endif
return
TSDB_CODE_SUCCESS
;
}
// TODO: We can avoid the type judegement by FP, but would prevent the inline scheme.
// typedef int (*tdAppendColValToSRowFp)(STSRow *pRow, void *pBitmap, int16_t colId, int8_t colType,
// const void *val, int8_t valType, int32_t tOffset, int16_t tIdx);
if
(
TD_IS_TP_ROW
(
pRow
))
{
tdAppendColValToTpRow
(
pRow
->
data
,
pBitmap
,
val
,
true
,
colType
,
valType
,
idx
,
offset
);
}
else
{
tdAppendColValToKvRow
(
pRow
->
data
,
pBitmap
,
val
,
true
,
colType
,
valType
,
idx
,
offset
,
colId
);
}
return
TSDB_CODE_SUCCESS
;
}
static
FORCE_INLINE
int
tdAppendColValToTpRow
(
STSRow
*
row
,
void
*
pBitmap
,
const
void
*
val
,
bool
isCopyVarData
,
int8_t
colType
,
int8_t
valType
,
int16_t
idx
,
int32_t
offset
)
{
ASSERT
(
offset
>=
sizeof
(
TSDB_DATA_TYPE_TIMESTAMP
));
if
(
!
tdValIsNone
(
valType
))
{
if
(
IS_VAR_DATA_TYPE
(
colType
))
{
// ts key stored in STSRow.ts
*
(
VarDataOffsetT
*
)
POINTER_SHIFT
(
row
->
data
,
offset
-
sizeof
(
TSDB_DATA_TYPE_TIMESTAMP
))
=
TD_ROW_LEN
(
row
);
if
(
isCopyVarData
)
{
memcpy
(
POINTER_SHIFT
(
row
,
TD_ROW_LEN
(
row
)),
val
,
varDataTLen
(
val
));
}
TD_ROW_LEN
(
row
)
+=
varDataTLen
(
val
);
}
else
{
memcpy
(
POINTER_SHIFT
(
row
->
data
,
offset
-
sizeof
(
TSDB_DATA_TYPE_TIMESTAMP
)),
val
,
TYPE_BYTES
[
colType
]);
}
}
#ifdef TD_SUPPORT_BITMAP
tdSetBitmap
(
pBitmap
,
idx
,
valType
);
#endif
return
0
;
}
static
FORCE_INLINE
int
tdAppendColValToKvRow
(
STSRow
*
row
,
void
*
pBitmap
,
const
void
*
val
,
bool
isCopyValData
,
int8_t
colType
,
int8_t
valType
,
int16_t
idx
,
int32_t
offset
,
int16_t
colId
)
{
ASSERT
(
offset
>=
sizeof
(
SKvRowIdx
));
if
(
!
tdValIsNone
(
valType
))
{
// ts key stored in STSRow.ts
SColIdx
*
pColIdx
=
(
SColIdx
*
)
POINTER_SHIFT
(
row
->
data
,
offset
-
sizeof
(
SKvRowIdx
));
char
*
ptr
=
(
char
*
)
POINTER_SHIFT
(
row
,
TD_ROW_LEN
(
row
));
pColIdx
->
colId
=
colId
;
pColIdx
->
offset
=
TD_ROW_LEN
(
row
);
// the offset include the TD_ROW_HEAD_LEN
if
(
IS_VAR_DATA_TYPE
(
colType
))
{
if
(
isCopyValData
)
{
memcpy
(
ptr
,
val
,
varDataTLen
(
val
));
}
TD_ROW_LEN
(
row
)
+=
varDataTLen
(
val
);
}
else
{
memcpy
(
ptr
,
val
,
TYPE_BYTES
[
colType
]);
TD_ROW_LEN
(
row
)
+=
TYPE_BYTES
[
colType
];
}
}
#ifdef TD_SUPPORT_BITMAP
tdSetBitmap
(
pBitmap
,
idx
,
valType
);
#endif
return
0
;
}
static
FORCE_INLINE
int32_t
tdSetBitmap
(
void
*
pBitmap
,
int16_t
tIdx
,
int8_t
valType
)
{
if
(
!
pBitmap
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
terrno
;
}
int16_t
nBytes
=
tIdx
/
TD_VTYPE_PARTS
;
int16_t
nOffset
=
tIdx
&
TD_VTYPE_OPTR
;
char
*
pDestByte
=
(
char
*
)
POINTER_SHIFT
(
pBitmap
,
nBytes
);
switch
(
nOffset
)
{
case
0
:
*
pDestByte
=
((
*
pDestByte
)
&
0x3F
)
|
(
valType
<<
6
);
break
;
case
1
:
*
pDestByte
=
((
*
pDestByte
)
&
0xCF
)
|
(
valType
<<
4
);
break
;
case
2
:
*
pDestByte
=
((
*
pDestByte
)
&
0xF3
)
|
(
valType
<<
2
);
break
;
case
3
:
*
pDestByte
=
((
*
pDestByte
)
&
0xFC
)
|
valType
;
break
;
default:
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
TSDB_CODE_SUCCESS
;
}
#ifdef TROW_ORIGIN_HZ
typedef
struct
{
uint32_t
nRows
;
char
rows
[];
}
STSRowBatch
;
static
void
tdSRowPrint
(
STSRow
*
row
)
{
printf
(
"type:%d, del:%d, sver:%d
\n
"
,
row
->
type
,
row
->
del
,
row
->
sver
);
printf
(
"isDeleted:%s, isTpRow:%s, isKvRow:%s
\n
"
,
TD_BOOL_STR
(
TD_ROW_IS_DELETED
(
row
)),
TD_BOOL_STR
(
TD_IS_TP_ROW
(
row
)),
TD_BOOL_STR
(
TD_IS_KV_ROW
(
row
)));
}
typedef
enum
{
///
ordinary
row builder
TD_
OR
_ROW_BUILDER
=
0
,
///
tuple
row builder
TD_
TP
_ROW_BUILDER
=
0
,
/// kv row builder
TD_KV_ROW_BUILDER
,
/// self-determined row builder
...
...
@@ -208,6 +524,7 @@ int tRowReaderRead(STSRowReader *pRowReader, col_id_t cid, void *target, uint64_
#define tRowBatchIterInit(pRB) \
{ .it = 0, .pRowBatch = (pRB) }
const
STSRow
*
tRowBatchIterNext
(
STSRowBatchIter
*
pRowBatchIter
);
#endif
#ifdef __cplusplus
}
...
...
include/util/types.h
浏览文件 @
cea242bf
...
...
@@ -72,7 +72,7 @@ static FORCE_INLINE double taos_align_get_double(const char *pBuf) {
#define SET_DOUBLE_PTR(x, y) { (*(double *)(x)) = (*(double *)(y)); }
// #endif
typedef
int16_t
VarDataLenT
;
// maxVarDataLen: 32767
typedef
u
int16_t
VarDataLenT
;
// maxVarDataLen: 32767
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0]
...
...
source/common/src/tdataformat.c
浏览文件 @
cea242bf
...
...
@@ -185,6 +185,10 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
schemaFLen
(
pSchema
)
=
pBuilder
->
flen
;
schemaVLen
(
pSchema
)
=
pBuilder
->
vlen
;
#ifdef TD_SUPPORT_BITMAP
schemaTLen
(
pSchema
)
+=
(
int
)
ceil
((
double
)
schemaNCols
(
pSchema
)
/
TD_VTYPE_PARTS
);
#endif
memcpy
(
schemaColAt
(
pSchema
,
0
),
pBuilder
->
columns
,
sizeof
(
STColumn
)
*
pBuilder
->
nCols
);
return
pSchema
;
...
...
source/common/src/trow.c
浏览文件 @
cea242bf
...
...
@@ -32,4 +32,874 @@ int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) {
// TODO
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "ulog.h"
#include "talgo.h"
#include "tcoding.h"
#include "wchar.h"
#include "tarray.h"
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints;
if(IS_VAR_DATA_TYPE(pCol->type)) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
}
if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded);
if(ptr == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
strerror(errno));
return -1;
} else {
pCol->pData = ptr;
pCol->spaceSize = spaceNeeded;
}
}
if(IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
}
return 0;
}
/**
* Duplicate the schema and return a new object
*/
STSchema *tdDupSchema(const STSchema *pSchema) {
int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema);
STSchema *tSchema = (STSchema *)malloc(tlen);
if (tSchema == NULL) return NULL;
memcpy((void *)tSchema, (void *)pSchema, tlen);
return tSchema;
}
/**
* Encode a schema to dst, and return the next pointer
*/
int tdEncodeSchema(void **buf, STSchema *pSchema) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema));
tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema));
for (int i = 0; i < schemaNCols(pSchema); i++) {
STColumn *pCol = schemaColAt(pSchema, i);
tlen += taosEncodeFixedI8(buf, colType(pCol));
tlen += taosEncodeFixedI16(buf, colColId(pCol));
tlen += taosEncodeFixedI16(buf, colBytes(pCol));
}
return tlen;
}
/**
* Decode a schema from a binary.
*/
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
int version = 0;
int numOfCols = 0;
STSchemaBuilder schemaBuilder;
buf = taosDecodeFixedI32(buf, &version);
buf = taosDecodeFixedI32(buf, &numOfCols);
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
for (int i = 0; i < numOfCols; i++) {
int8_t type = 0;
int16_t colId = 0;
int16_t bytes = 0;
buf = taosDecodeFixedI8(buf, &type);
buf = taosDecodeFixedI16(buf, &colId);
buf = taosDecodeFixedI16(buf, &bytes);
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
}
}
*pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
tdDestroyTSchemaBuilder(&schemaBuilder);
return buf;
}
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
if (pBuilder == NULL) return -1;
pBuilder->tCols = 256;
pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols);
if (pBuilder->columns == NULL) return -1;
tdResetTSchemaBuilder(pBuilder, version);
return 0;
}
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder) {
tfree(pBuilder->columns);
}
}
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
pBuilder->nCols = 0;
pBuilder->tlen = 0;
pBuilder->flen = 0;
pBuilder->vlen = 0;
pBuilder->version = version;
}
int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) {
if (!isValidDataType(type)) return -1;
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols);
if (columns == NULL) return -1;
pBuilder->columns = columns;
}
STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]);
colSetType(pCol, type);
colSetColId(pCol, colId);
if (pBuilder->nCols == 0) {
colSetOffset(pCol, 0);
} else {
STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]);
colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]);
}
if (IS_VAR_DATA_TYPE(type)) {
colSetBytes(pCol, bytes);
pBuilder->tlen += (TYPE_BYTES[type] + bytes);
pBuilder->vlen += bytes - sizeof(VarDataLenT);
} else {
colSetBytes(pCol, TYPE_BYTES[type]);
pBuilder->tlen += TYPE_BYTES[type];
pBuilder->vlen += TYPE_BYTES[type];
}
pBuilder->nCols++;
pBuilder->flen += TYPE_BYTES[type];
ASSERT(pCol->offset < pBuilder->flen);
return 0;
}
STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
if (pBuilder->nCols <= 0) return NULL;
int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols;
STSchema *pSchema = (STSchema *)malloc(tlen);
if (pSchema == NULL) return NULL;
schemaVersion(pSchema) = pBuilder->version;
schemaNCols(pSchema) = pBuilder->nCols;
schemaTLen(pSchema) = pBuilder->tlen;
schemaFLen(pSchema) = pBuilder->flen;
schemaVLen(pSchema) = pBuilder->vlen;
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
return pSchema;
}
/**
* Initialize a data row
*/
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema));
dataRowSetVersion(row, schemaVersion(pSchema));
}
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
int32_t size = dataRowMaxBytesFromSchema(pSchema);
SDataRow row = malloc(size);
if (row == NULL) return NULL;
tdInitDataRow(row, pSchema);
return row;
}
/**
* Free the SDataRow object
*/
void tdFreeDataRow(SDataRow row) {
if (row) free(row);
}
SDataRow tdDataRowDup(SDataRow row) {
SDataRow trow = malloc(dataRowLen(row));
if (trow == NULL) return NULL;
dataRowCpy(trow, row);
return trow;
}
SMemRow tdMemRowDup(SMemRow row) {
SMemRow trow = malloc(memRowTLen(row));
if (trow == NULL) return NULL;
memRowCpy(trow, row);
return trow;
}
void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->type = colType(pCol);
pDataCol->colId = colColId(pCol);
pDataCol->bytes = colBytes(pCol);
pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE;
pDataCol->len = 0;
}
// value from timestamp should be TKEY here instead of TSKEY
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return 0;
}
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
// Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Update the length
pCol->len += varDataTLen(value);
} else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
}
return 0;
}
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else {
return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row);
}
}
bool isNEleNull(SDataCol *pCol, int nEle) {
if(isAllRowsNull(pCol)) return true;
for (int i = 0; i < nEle; i++) {
if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false;
}
return true;
}
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
setVardataNull(ptr, pCol->type);
pCol->len += varDataTLen(ptr);
} else {
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
pCol->len += TYPE_BYTES[pCol->type];
}
}
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0;
for (int i = 0; i < nEle; i++) {
dataColSetNullAt(pCol, i);
}
} else {
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
pCol->len = TYPE_BYTES[pCol->type] * nEle;
}
}
void dataColSetOffset(SDataCol *pCol, int nEle) {
ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
void *tptr = pCol->pData;
// char *tptr = (char *)(pCol->pData);
VarDataOffsetT offset = 0;
for (int i = 0; i < nEle; i++) {
pCol->dataOff[i] = offset;
offset += varDataTLen(tptr);
tptr = POINTER_SHIFT(tptr, varDataTLen(tptr));
}
}
SDataCols *tdNewDataCols(int maxCols, int maxRows) {
SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols));
if (pCols == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno));
return NULL;
}
pCols->maxPoints = maxRows;
pCols->maxCols = maxCols;
pCols->numOfRows = 0;
pCols->numOfCols = 0;
if (maxCols > 0) {
pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol));
if (pCols->cols == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols,
strerror(errno));
tdFreeDataCols(pCols);
return NULL;
}
int i;
for(i = 0; i < maxCols; i++) {
pCols->cols[i].spaceSize = 0;
pCols->cols[i].len = 0;
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
}
}
return pCols;
}
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
int i;
int oldMaxCols = pCols->maxCols;
if (schemaNCols(pSchema) > oldMaxCols) {
pCols->maxCols = schemaNCols(pSchema);
void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols);
if (ptr == NULL) return -1;
pCols->cols = ptr;
for(i = oldMaxCols; i < pCols->maxCols; i++) {
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
pCols->cols[i].spaceSize = 0;
}
}
tdResetDataCols(pCols);
pCols->numOfCols = schemaNCols(pSchema);
for (i = 0; i < schemaNCols(pSchema); i++) {
dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints);
}
return 0;
}
SDataCols *tdFreeDataCols(SDataCols *pCols) {
int i;
if (pCols) {
if(pCols->cols) {
int maxCols = pCols->maxCols;
for(i = 0; i < maxCols; i++) {
SDataCol *pCol = &pCols->cols[i];
tfree(pCol->pData);
}
free(pCols->cols);
pCols->cols = NULL;
}
free(pCols);
}
return NULL;
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints);
if (pRet == NULL) return NULL;
pRet->numOfCols = pDataCols->numOfCols;
pRet->sversion = pDataCols->sversion;
if (keepData) pRet->numOfRows = pDataCols->numOfRows;
for (int i = 0; i < pDataCols->numOfCols; i++) {
pRet->cols[i].type = pDataCols->cols[i].type;
pRet->cols[i].colId = pDataCols->cols[i].colId;
pRet->cols[i].bytes = pDataCols->cols[i].bytes;
pRet->cols[i].offset = pDataCols->cols[i].offset;
if (keepData) {
if (pDataCols->cols[i].len > 0) {
if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) {
tdFreeDataCols(pRet);
return NULL;
}
pRet->cols[i].len = pDataCols->cols[i].len;
memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len);
if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) {
int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints;
memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize);
}
}
}
}
return pRet;
}
void tdResetDataCols(SDataCols *pCols) {
if (pCols != NULL) {
pCols->numOfRows = 0;
for (int i = 0; i < pCols->maxCols; i++) {
dataColReset(pCols->cols + i);
}
}
}
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0;
int dcol = 0;
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
continue;
}
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
}
}
pCols->numOfRows++;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0;
int dcol = 0;
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
++dcol;
continue;
}
SColIdx *colIdx = kvRowColIdxAt(row, rcol);
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
++dcol;
}
}
pCols->numOfRows++;
}
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
} else {
ASSERT(0);
}
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols);
int offset = 0;
if (pOffset == NULL) {
pOffset = &offset;
}
SDataCols *pTarget = NULL;
if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap
ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) {
if (source->cols[j].len > 0 || target->cols[j].len > 0) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows,
target->maxPoints);
}
}
target->numOfRows++;
}
(*pOffset) += rowsToMerge;
} else {
pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err;
int iter1 = 0;
tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows,
pTarget->numOfRows + rowsToMerge, forceSetNull);
}
tdFreeDataCols(pTarget);
return 0;
_err:
tdFreeDataCols(pTarget);
return -1;
}
// src2 data has more priority than src1
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull) {
tdResetDataCols(target);
ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows);
while (target->numOfRows < tRows) {
if (*iter1 >= limit1 && *iter2 >= limit2) break;
TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1);
TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2);
TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
if (src1->cols[i].len > 0 || target->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
}
}
target->numOfRows++;
(*iter1)++;
} else if (key1 >= key2) {
if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) {
for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type);
if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints);
} else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) {
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints);
} else if(target->cols[i].len > 0) {
dataColSetNullAt(&target->cols[i], target->numOfRows);
}
}
target->numOfRows++;
}
(*iter2)++;
if (key1 == key2) (*iter1)++;
}
ASSERT(target->numOfRows <= target->maxPoints);
}
}
SKVRow tdKVRowDup(SKVRow row) {
SKVRow trow = malloc(kvRowLen(row));
if (trow == NULL) return NULL;
kvRowCpy(trow, row);
return trow;
}
static int compareColIdx(const void* a, const void* b) {
const SColIdx* x = (const SColIdx*)a;
const SColIdx* y = (const SColIdx*)b;
if (x->colId > y->colId) {
return 1;
}
if (x->colId < y->colId) {
return -1;
}
return 0;
}
void tdSortKVRowByColIdx(SKVRow row) {
qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx);
}
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
SColIdx *pColIdx = NULL;
SKVRow row = *orow;
SKVRow nrow = NULL;
void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row
int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff;
int oRowCols = kvRowNCols(row);
ASSERT(diff > 0);
nrow = malloc(nRowLen);
if (nrow == NULL) return -1;
kvRowSetLen(nrow, nRowLen);
kvRowSetNCols(nrow, oRowCols + 1);
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols);
memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row));
pColIdx = kvRowColIdxAt(nrow, oRowCols);
pColIdx->colId = colId;
pColIdx->offset = kvRowValLen(row);
memcpy(kvRowColVal(nrow, pColIdx), value, diff); // copy new value
tdSortKVRowByColIdx(nrow);
*orow = nrow;
free(row);
} else {
ASSERT(((SColIdx *)ptr)->colId == colId);
if (IS_VAR_DATA_TYPE(type)) {
void *pOldVal = kvRowColVal(row, (SColIdx *)ptr);
if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
memcpy(pOldVal, value, varDataTLen(value));
} else { // need to reallocate the memory
int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal));
ASSERT(nlen > 0);
nrow = malloc(nlen);
if (nrow == NULL) return -1;
kvRowSetLen(nrow, nlen);
kvRowSetNCols(nrow, kvRowNCols(row));
int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset;
memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize);
memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value));
// Copy left value part
int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal);
if (lsize > 0) {
memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)),
POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize);
}
for (int i = 0; i < kvRowNCols(nrow); i++) {
pColIdx = kvRowColIdxAt(nrow, i);
if (pColIdx->offset > ((SColIdx *)ptr)->offset) {
pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value);
}
}
*orow = nrow;
free(row);
}
} else {
memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]);
}
}
return 0;
}
int tdEncodeKVRow(void **buf, SKVRow row) {
// May change the encode purpose
if (buf != NULL) {
kvRowCpy(*buf, row);
*buf = POINTER_SHIFT(*buf, kvRowLen(row));
}
return kvRowLen(row);
}
void *tdDecodeKVRow(void *buf, SKVRow *row) {
*row = tdKVRowDup(buf);
if (*row == NULL) return NULL;
return POINTER_SHIFT(buf, kvRowLen(*row));
}
int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) {
pBuilder->tCols = 128;
pBuilder->nCols = 0;
pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
pBuilder->alloc = 1024;
pBuilder->size = 0;
pBuilder->buf = malloc(pBuilder->alloc);
if (pBuilder->buf == NULL) {
free(pBuilder->pColIdx);
return -1;
}
return 0;
}
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) {
tfree(pBuilder->pColIdx);
tfree(pBuilder->buf);
}
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
pBuilder->nCols = 0;
pBuilder->size = 0;
}
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
if (tlen == 0) return NULL;
tlen += TD_KV_ROW_HEAD_SIZE;
SKVRow row = malloc(tlen);
if (row == NULL) return NULL;
kvRowSetNCols(row, pBuilder->nCols);
kvRowSetLen(row, tlen);
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return row;
}
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) {
#if 0
ASSERT(memRowKey(row1) == memRowKey(row2));
ASSERT(schemaVersion(pSchema1) == memRowVersion(row1));
ASSERT(schemaVersion(pSchema2) == memRowVersion(row2));
ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif
SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
if (stashRow == NULL) {
return NULL;
}
SMemRow pRow = buffer;
SDataRow dataRow = memRowDataBody(pRow);
memRowSetType(pRow, SMEM_ROW_DATA);
dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));
TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
int32_t i = 0; // row1
int32_t j = 0; // row2
int32_t nCols1 = schemaNCols(pSchema1);
int32_t nCols2 = schemaNCols(pSchema2);
SColInfo colInfo = {0};
int32_t kvIdx1 = 0, kvIdx2 = 0;
while (i < nCols1) {
STColumn *pCol = schemaColAt(pSchema1, i);
void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
// if val1 != NULL, use val1;
if (val1 != NULL && !isNull(val1, pCol->type)) {
tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
taosArrayPush(stashRow, &colInfo);
++i; // next col
continue;
}
void *val2 = NULL;
while (j < nCols2) {
STColumn *tCol = schemaColAt(pSchema2, j);
if (tCol->colId < pCol->colId) {
++j;
continue;
}
if (tCol->colId == pCol->colId) {
val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
} else if (tCol->colId > pCol->colId) {
// set NULL
}
break;
} // end of while(j<nCols2)
if (val2 == NULL) {
val2 = (void *)getNullValue(pCol->type);
}
tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
if (!isNull(val2, pCol->type)) {
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
taosArrayPush(stashRow, &colInfo);
}
++i; // next col
}
dataLen = memRowTLen(pRow);
if (kvLen < dataLen) {
// scan stashRow and generate SKVRow
memset(buffer, 0, sizeof(dataLen));
SMemRow tRow = buffer;
memRowSetType(tRow, SMEM_ROW_KV);
SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
kvRowSetNCols(kvRow, nKvNCols);
memRowSetKvVersion(tRow, pSchema1->version);
int32_t toffset = 0;
int16_t k;
for (k = 0; k < nKvNCols; ++k) {
SColInfo *pColInfo = taosArrayGet(stashRow, k);
tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
toffset += sizeof(SColIdx);
}
ASSERT(kvLen == memRowTLen(tRow));
}
taosArrayDestroy(stashRow);
return buffer;
}
#endif
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
cea242bf
...
...
@@ -699,7 +699,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe
STbData
**
pMem
=
NULL
;
STbData
**
pIMem
=
NULL
;
TKEY
tLastKey
=
0
;
/// keyToTkey(pCheckInfo->lastKey);
T
S
KEY
tLastKey
=
0
;
/// keyToTkey(pCheckInfo->lastKey);
if
(
pHandle
->
pTsdb
->
mem
!=
NULL
)
{
pMem
=
taosHashGet
(
pHandle
->
pTsdb
->
mem
->
pHashIdx
,
&
pCheckInfo
->
tableId
,
sizeof
(
pCheckInfo
->
tableId
));
if
(
pMem
!=
NULL
)
{
...
...
@@ -1661,11 +1661,14 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
SET_DOUBLE_PTR
(
pData
,
value
);
break
;
case
TSDB_DATA_TYPE_TIMESTAMP
:
#if 0 // only TSKEY supported since 3.0
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
*(TSKEY *)pData = tdGetKey(*(TKEY *)value);
} else {
*(TSKEY *)pData = *(TSKEY *)value;
}
#endif
*
(
TSKEY
*
)
pData
=
*
(
TSKEY
*
)
value
;
break
;
default:
memcpy
(
pData
,
value
,
pColInfo
->
info
.
bytes
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录