Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
34e5bad7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
34e5bad7
编写于
7月 11, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact row
上级
a52c089d
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
43 addition
and
996 deletion
+43
-996
include/common/tdataformat.h
include/common/tdataformat.h
+12
-99
include/common/trow.h
include/common/trow.h
+4
-6
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+25
-229
source/common/src/trow.c
source/common/src/trow.c
+2
-662
未找到文件。
include/common/tdataformat.h
浏览文件 @
34e5bad7
...
...
@@ -64,18 +64,22 @@ int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type);
int32_t
tGetValue
(
uint8_t
*
p
,
SValue
*
pValue
,
int8_t
type
);
int
tValueCmprFn
(
const
SValue
*
pValue1
,
const
SValue
*
pValue2
,
int8_t
type
);
// S
TSRow2
// S
ColVal
#define COL_VAL_NONE(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNone = 1})
#define COL_VAL_NULL(CID, TYPE) ((SColVal){.cid = (CID), .type = (TYPE), .isNull = 1})
#define COL_VAL_VALUE(CID, TYPE, V) ((SColVal){.cid = (CID), .type = (TYPE), .value = (V)})
// STSRow2
#define TSROW_LEN(PROW, V) tGetI32v((uint8_t *)(PROW)->data, (V) ? &(V) : NULL)
#define TSROW_SVER(PROW, V) tGetI32v((PROW)->data + TSROW_LEN(PROW, NULL), (V) ? &(V) : NULL)
int32_t
tTSRowNew
(
STSRowBuilder
*
pBuilder
,
SArray
*
pArray
,
STSchema
*
pTSchema
,
STSRow2
**
ppRow
);
int32_t
tTSRowClone
(
const
STSRow2
*
pRow
,
STSRow2
**
ppRow
);
void
tTSRowFree
(
STSRow2
*
pRow
);
void
tTSRowGet
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
);
int32_t
tTSRowToArray
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
SArray
**
ppArray
);
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
);
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
);
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
*
p
pRow
);
// STSRowBuilder
#define tsRowBuilderInit() ((STSRowBuilder){0})
...
...
@@ -97,7 +101,7 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t
tDecodeTag
(
SDecoder
*
pDecoder
,
STag
**
ppTag
);
int32_t
tTagToValArray
(
const
STag
*
pTag
,
SArray
**
ppArray
);
void
debugPrintSTag
(
STag
*
pTag
,
const
char
*
tag
,
int32_t
ln
);
// TODO: remove
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
);
int32_t
parseJsontoTagData
(
const
char
*
json
,
SArray
*
pTagVals
,
STag
**
ppTag
,
void
*
pMsgBuf
);
// STRUCT =================
struct
STColumn
{
...
...
@@ -123,13 +127,13 @@ struct STSchema {
#define TSROW_KV_SMALL ((uint8_t)0x10U)
#define TSROW_KV_MID ((uint8_t)0x20U)
#define TSROW_KV_BIG ((uint8_t)0x40U)
#pragma pack(push, 1)
struct
STSRow2
{
TSKEY
ts
;
uint8_t
flags
;
int32_t
sver
;
uint32_t
nData
;
uint8_t
*
pData
;
TSKEY
ts
;
uint8_t
flags
;
uint8_t
data
[];
};
#pragma pack(pop)
struct
STSRowBuilder
{
STSRow2
tsRow
;
...
...
@@ -343,97 +347,6 @@ static FORCE_INLINE int32_t tkeyComparFn(const void *tkey1, const void *tkey2) {
}
}
// ----------------- Data column structure
// SDataCol arrangement: data => bitmap => dataOffset
typedef
struct
SDataCol
{
int8_t
type
;
// column type
uint8_t
bitmap
:
1
;
// 0: no bitmap if all rows are NORM, 1: has bitmap if has NULL/NORM rows
uint8_t
reserve
:
7
;
int16_t
colId
;
// column ID
int32_t
bytes
;
// column data bytes defined
int32_t
offset
;
// data offset in a SDataRow (including the header size)
int32_t
spaceSize
;
// Total space size for this column
int32_t
len
;
// column data length
VarDataOffsetT
*
dataOff
;
// For binary and nchar data, the offset in the data column
void
*
pData
;
// Actual data pointer
void
*
pBitmap
;
// Bitmap pointer
TSKEY
ts
;
// only used in last NULL column
}
SDataCol
;
#define isAllRowsNull(pCol) ((pCol)->len == 0)
#define isAllRowsNone(pCol) ((pCol)->len == 0)
static
FORCE_INLINE
void
dataColReset
(
SDataCol
*
pDataCol
)
{
pDataCol
->
len
=
0
;
}
int32_t
tdAllocMemForCol
(
SDataCol
*
pCol
,
int32_t
maxPoints
);
void
dataColInit
(
SDataCol
*
pDataCol
,
STColumn
*
pCol
,
int32_t
maxPoints
);
int32_t
dataColAppendVal
(
SDataCol
*
pCol
,
const
void
*
value
,
int32_t
numOfRows
,
int32_t
maxPoints
);
void
*
dataColSetOffset
(
SDataCol
*
pCol
,
int32_t
nEle
);
bool
isNEleNull
(
SDataCol
*
pCol
,
int32_t
nEle
);
typedef
struct
{
col_id_t
maxCols
;
// max number of columns
col_id_t
numOfCols
;
// Total number of cols
int32_t
maxPoints
;
// max number of points
int32_t
numOfRows
;
int32_t
bitmapMode
:
1
;
// default is 0(2 bits), otherwise 1(1 bit)
int32_t
sversion
:
31
;
// TODO: set sversion(not used yet)
SDataCol
*
cols
;
}
SDataCols
;
static
FORCE_INLINE
bool
tdDataColsIsBitmapI
(
SDataCols
*
pCols
)
{
return
pCols
->
bitmapMode
!=
TSDB_BITMODE_DEFAULT
;
}
static
FORCE_INLINE
void
tdDataColsSetBitmapI
(
SDataCols
*
pCols
)
{
pCols
->
bitmapMode
=
TSDB_BITMODE_ONE_BIT
;
}
static
FORCE_INLINE
bool
tdIsBitmapModeI
(
int8_t
bitmapMode
)
{
return
bitmapMode
!=
TSDB_BITMODE_DEFAULT
;
}
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
static
FORCE_INLINE
TKEY
dataColsTKeyFirst
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsTKeyAt
(
pCols
,
0
);
}
else
{
return
TKEY_INVALID
;
}
}
static
FORCE_INLINE
TSKEY
dataColsKeyAtRow
(
SDataCols
*
pCols
,
int32_t
row
)
{
assert
(
row
<
pCols
->
numOfRows
);
return
dataColsKeyAt
(
pCols
,
row
);
}
static
FORCE_INLINE
TSKEY
dataColsKeyFirst
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsKeyAt
(
pCols
,
0
);
}
else
{
return
TSDB_DATA_TIMESTAMP_NULL
;
}
}
static
FORCE_INLINE
TKEY
dataColsTKeyLast
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsTKeyAt
(
pCols
,
pCols
->
numOfRows
-
1
);
}
else
{
return
TKEY_INVALID
;
}
}
static
FORCE_INLINE
TSKEY
dataColsKeyLast
(
SDataCols
*
pCols
)
{
if
(
pCols
->
numOfRows
)
{
return
dataColsKeyAt
(
pCols
,
pCols
->
numOfRows
-
1
);
}
else
{
return
TSDB_DATA_TIMESTAMP_NULL
;
}
}
SDataCols
*
tdNewDataCols
(
int32_t
maxCols
,
int32_t
maxRows
);
void
tdResetDataCols
(
SDataCols
*
pCols
);
int32_t
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
);
SDataCols
*
tdDupDataCols
(
SDataCols
*
pCols
,
bool
keepData
);
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
);
int32_t
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int32_t
rowsToMerge
,
int32_t
*
pOffset
,
bool
update
,
TDRowVerT
maxVer
);
#endif
#ifdef __cplusplus
...
...
include/common/trow.h
浏览文件 @
34e5bad7
...
...
@@ -223,9 +223,10 @@ int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDR
static
FORCE_INLINE
int32_t
tdGetBitmapValType
(
const
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
*
pValType
,
int8_t
bitmapMode
);
bool
tdIsBitmapBlkNorm
(
const
void
*
pBitmap
,
int32_t
numOfBits
,
int8_t
bitmapMode
);
int32_t
tdAppendValToDataCol
(
SDataCol
*
pCol
,
TDRowValT
valType
,
const
void
*
val
,
int32_t
numOfRows
,
int32_t
maxPoints
,
int8_t
bitmapMode
,
bool
isMerge
);
int32_t
tdAppendSTSRowToDataCol
(
STSRow
*
pRow
,
STSchema
*
pSchema
,
SDataCols
*
pCols
,
bool
isMerge
);
// int32_t tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int32_t numOfRows, int32_t
// maxPoints,
// int8_t bitmapMode, bool isMerge);
// int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool isMerge);
int32_t
tdGetBitmapValTypeII
(
const
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
*
pValType
);
int32_t
tdSetBitmapValTypeI
(
void
*
pBitmap
,
int16_t
colIdx
,
TDRowValT
valType
);
...
...
@@ -318,12 +319,9 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SC
bool
tdGetTpRowDataOfCol
(
STSRowIter
*
pIter
,
col_type_t
colType
,
int32_t
offset
,
SCellVal
*
pVal
);
bool
tdGetKvRowValOfColEx
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
col_id_t
*
nIdx
,
SCellVal
*
pVal
);
bool
tdSTSRowIterNext
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
SCellVal
*
pVal
);
STSRow
*
mergeTwoRows
(
void
*
buffer
,
STSRow
*
row1
,
STSRow
*
row2
,
STSchema
*
pSchema1
,
STSchema
*
pSchema2
);
int32_t
tdGetColDataOfRow
(
SCellVal
*
pVal
,
SDataCol
*
pCol
,
int32_t
row
,
int8_t
bitmapMode
);
bool
tdSTpRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_type_t
colType
,
int32_t
flen
,
uint32_t
offset
,
col_id_t
colIdx
,
SCellVal
*
pVal
);
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_id_t
colIdx
,
SCellVal
*
pVal
);
int32_t
dataColGetNEleLen
(
SDataCol
*
pDataCol
,
int32_t
rows
,
int8_t
bitmapMode
);
void
tdSCellValPrint
(
SCellVal
*
pVal
,
int8_t
colType
);
void
tdSRowPrint
(
STSRow
*
row
,
STSchema
*
pSchema
,
const
char
*
tag
);
...
...
source/common/src/tdataformat.c
浏览文件 @
34e5bad7
...
...
@@ -175,7 +175,8 @@ static void setBitMap(uint8_t *pb, uint8_t v, int32_t idx, uint8_t flags) {
} while (0)
int32_t
tTSRowNew
(
STSRowBuilder
*
pBuilder
,
SArray
*
pArray
,
STSchema
*
pTSchema
,
STSRow2
**
ppRow
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
#if 0
STColumn *pTColumn;
SColVal *pColVal;
int32_t nColVal = taosArrayGetSize(pArray);
...
...
@@ -462,30 +463,22 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S
}
}
#endif
_exit:
return
code
;
}
int32_t
tTSRowClone
(
const
STSRow2
*
pRow
,
STSRow2
**
ppRow
)
{
int32_t
code
=
0
;
int32_t
rLen
;
(
*
ppRow
)
=
(
STSRow2
*
)
taosMemoryMalloc
(
sizeof
(
**
ppRow
));
TSROW_LEN
(
pRow
,
rLen
);
(
*
ppRow
)
=
(
STSRow2
*
)
taosMemoryMalloc
(
rLen
);
if
(
*
ppRow
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
**
ppRow
=
*
pRow
;
(
*
ppRow
)
->
pData
=
NULL
;
if
(
pRow
->
nData
)
{
(
*
ppRow
)
->
pData
=
taosMemoryMalloc
(
pRow
->
nData
);
if
((
*
ppRow
)
->
pData
==
NULL
)
{
taosMemoryFree
(
*
ppRow
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
memcpy
((
*
ppRow
)
->
pData
,
pRow
->
pData
,
pRow
->
nData
);
}
memcpy
(
*
ppRow
,
pRow
,
rLen
);
_exit:
return
code
;
...
...
@@ -493,12 +486,12 @@ _exit:
void
tTSRowFree
(
STSRow2
*
pRow
)
{
if
(
pRow
)
{
if
(
pRow
->
pData
)
taosMemoryFree
(
pRow
->
pData
);
taosMemoryFree
(
pRow
);
}
}
void
tTSRowGet
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
)
{
#if 0
uint8_t isTuple = ((pRow->flags & 0xf0) == 0) ? 1 : 0;
STColumn *pTColumn = &pTSchema->columns[iCol];
uint8_t flags = pRow->flags & (uint8_t)0xf;
...
...
@@ -643,10 +636,12 @@ _return_null:
_return_value:
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
return;
#endif
}
int32_t
tTSRowToArray
(
STSRow2
*
pRow
,
STSchema
*
pTSchema
,
SArray
**
ppArray
)
{
int32_t
code
=
0
;
#if 0
SColVal cv;
(*ppArray) = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
...
...
@@ -660,52 +655,27 @@ int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray) {
taosArrayPush(*ppArray, &cv);
}
#endif
_exit:
return
code
;
}
int32_t
tPutTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
)
{
int32_t
n
=
0
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pRow
->
ts
);
n
+=
tPutI8
(
p
?
p
+
n
:
p
,
pRow
->
flags
);
n
+=
tPutI32v
(
p
?
p
+
n
:
p
,
pRow
->
sver
);
int32_t
n
;
ASSERT
(
pRow
->
flags
&
0xf
);
switch
(
pRow
->
flags
&
0xf
)
{
case
TSROW_HAS_NONE
:
case
TSROW_HAS_NULL
:
ASSERT
(
pRow
->
nData
==
0
);
ASSERT
(
pRow
->
pData
==
NULL
);
break
;
default:
ASSERT
(
pRow
->
nData
&&
pRow
->
pData
);
n
+=
tPutBinary
(
p
?
p
+
n
:
p
,
pRow
->
pData
,
pRow
->
nData
);
break
;
TSROW_LEN
(
pRow
,
n
);
if
(
p
)
{
memcpy
(
p
,
pRow
,
n
);
}
return
n
;
}
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
pRow
)
{
int32_t
n
=
0
;
int32_t
tGetTSRow
(
uint8_t
*
p
,
STSRow2
*
*
p
pRow
)
{
int32_t
n
;
n
+=
tGetI64
(
p
+
n
,
&
pRow
->
ts
);
n
+=
tGetI8
(
p
+
n
,
&
pRow
->
flags
);
n
+=
tGetI32v
(
p
+
n
,
&
pRow
->
sver
);
ASSERT
(
pRow
->
flags
);
switch
(
pRow
->
flags
&
0xf
)
{
case
TSROW_HAS_NONE
:
case
TSROW_HAS_NULL
:
pRow
->
nData
=
0
;
pRow
->
pData
=
NULL
;
break
;
default:
n
+=
tGetBinary
(
p
+
n
,
&
pRow
->
pData
,
&
pRow
->
nData
);
break
;
}
*
ppRow
=
(
STSRow2
*
)
p
;
TSROW_LEN
(
*
ppRow
,
n
);
return
n
;
}
...
...
@@ -904,15 +874,13 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
return
n
;
}
bool
tTagIsJson
(
const
void
*
pTag
){
return
(((
const
STag
*
)
pTag
)
->
flags
&
TD_TAG_JSON
);
}
bool
tTagIsJson
(
const
void
*
pTag
)
{
return
(((
const
STag
*
)
pTag
)
->
flags
&
TD_TAG_JSON
);
}
bool
tTagIsJsonNull
(
void
*
data
){
STag
*
pTag
=
(
STag
*
)
data
;
int8_t
isJson
=
tTagIsJson
(
pTag
);
if
(
!
isJson
)
return
false
;
return
((
STag
*
)
data
)
->
nTag
==
0
;
bool
tTagIsJsonNull
(
void
*
data
)
{
STag
*
pTag
=
(
STag
*
)
data
;
int8_t
isJson
=
tTagIsJson
(
pTag
);
if
(
!
isJson
)
return
false
;
return
((
STag
*
)
data
)
->
nTag
==
0
;
}
int32_t
tTagNew
(
SArray
*
pArray
,
int32_t
version
,
int8_t
isJson
,
STag
**
ppTag
)
{
...
...
@@ -1097,46 +1065,6 @@ _err:
}
#if 1 // ===================================================================================================================
static
void
dataColSetNEleNull
(
SDataCol
*
pCol
,
int
nEle
);
int
tdAllocMemForCol
(
SDataCol
*
pCol
,
int
maxPoints
)
{
int
spaceNeeded
=
pCol
->
bytes
*
maxPoints
;
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
spaceNeeded
+=
sizeof
(
VarDataOffsetT
)
*
maxPoints
;
}
#ifdef TD_SUPPORT_BITMAP
int32_t
nBitmapBytes
=
(
int32_t
)
TD_BITMAP_BYTES
(
maxPoints
);
spaceNeeded
+=
(
int
)
nBitmapBytes
;
// TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more
// TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered.
// spaceNeeded += TYPE_BYTES[pCol->type]; // the bitmap part is append as a single part since 2022.04.03, thus
// remove the additional space
#endif
if
(
pCol
->
spaceSize
<
spaceNeeded
)
{
void
*
ptr
=
taosMemoryRealloc
(
pCol
->
pData
,
spaceNeeded
);
if
(
ptr
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
spaceNeeded
,
strerror
(
errno
));
return
-
1
;
}
else
{
pCol
->
pData
=
ptr
;
pCol
->
spaceSize
=
spaceNeeded
;
}
}
#ifdef TD_SUPPORT_BITMAP
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pBitmap
,
nBitmapBytes
);
}
else
{
pCol
->
pBitmap
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
}
#else
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
bytes
*
maxPoints
);
}
#endif
return
0
;
}
/**
* Duplicate the schema and return a new object
...
...
@@ -1290,136 +1218,4 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
return
pSchema
;
}
void
dataColInit
(
SDataCol
*
pDataCol
,
STColumn
*
pCol
,
int
maxPoints
)
{
pDataCol
->
type
=
colType
(
pCol
);
pDataCol
->
colId
=
colColId
(
pCol
);
pDataCol
->
bytes
=
colBytes
(
pCol
);
pDataCol
->
offset
=
colOffset
(
pCol
)
+
0
;
// TD_DATA_ROW_HEAD_SIZE;
pDataCol
->
len
=
0
;
}
static
FORCE_INLINE
const
void
*
tdGetColDataOfRowUnsafe
(
SDataCol
*
pCol
,
int
row
)
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
return
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
dataOff
[
row
]);
}
else
{
return
POINTER_SHIFT
(
pCol
->
pData
,
TYPE_BYTES
[
pCol
->
type
]
*
row
);
}
}
bool
isNEleNull
(
SDataCol
*
pCol
,
int
nEle
)
{
if
(
isAllRowsNull
(
pCol
))
return
true
;
for
(
int
i
=
0
;
i
<
nEle
;
++
i
)
{
if
(
!
isNull
(
tdGetColDataOfRowUnsafe
(
pCol
,
i
),
pCol
->
type
))
return
false
;
}
return
true
;
}
void
*
dataColSetOffset
(
SDataCol
*
pCol
,
int
nEle
)
{
ASSERT
(((
pCol
->
type
==
TSDB_DATA_TYPE_BINARY
)
||
(
pCol
->
type
==
TSDB_DATA_TYPE_NCHAR
)));
void
*
tptr
=
pCol
->
pData
;
// char *tptr = (char *)(pCol->pData);
VarDataOffsetT
offset
=
0
;
for
(
int
i
=
0
;
i
<
nEle
;
++
i
)
{
pCol
->
dataOff
[
i
]
=
offset
;
offset
+=
varDataTLen
(
tptr
);
tptr
=
POINTER_SHIFT
(
tptr
,
varDataTLen
(
tptr
));
}
return
POINTER_SHIFT
(
tptr
,
varDataTLen
(
tptr
));
}
SDataCols
*
tdNewDataCols
(
int
maxCols
,
int
maxRows
)
{
SDataCols
*
pCols
=
(
SDataCols
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SDataCols
));
if
(
pCols
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
sizeof
(
SDataCols
),
strerror
(
errno
));
return
NULL
;
}
pCols
->
maxPoints
=
maxRows
;
pCols
->
maxCols
=
maxCols
;
pCols
->
numOfRows
=
0
;
pCols
->
numOfCols
=
0
;
pCols
->
bitmapMode
=
TSDB_BITMODE_DEFAULT
;
if
(
maxCols
>
0
)
{
pCols
->
cols
=
(
SDataCol
*
)
taosMemoryCalloc
(
maxCols
,
sizeof
(
SDataCol
));
if
(
pCols
->
cols
==
NULL
)
{
uDebug
(
"malloc failure, size:%"
PRId64
" failed, reason:%s"
,
(
int64_t
)
sizeof
(
SDataCol
)
*
maxCols
,
strerror
(
errno
));
tdFreeDataCols
(
pCols
);
return
NULL
;
}
#if 0 // no need as calloc used
int i;
for (i = 0; i < maxCols; i++) {
pCols->cols[i].spaceSize = 0;
pCols->cols[i].len = 0;
pCols->cols[i].pData = NULL;
pCols->cols[i].dataOff = NULL;
}
#endif
}
return
pCols
;
}
int
tdInitDataCols
(
SDataCols
*
pCols
,
STSchema
*
pSchema
)
{
int
i
;
int
oldMaxCols
=
pCols
->
maxCols
;
if
(
schemaNCols
(
pSchema
)
>
oldMaxCols
)
{
pCols
->
maxCols
=
schemaNCols
(
pSchema
);
void
*
ptr
=
(
SDataCol
*
)
taosMemoryRealloc
(
pCols
->
cols
,
sizeof
(
SDataCol
)
*
pCols
->
maxCols
);
if
(
ptr
==
NULL
)
return
-
1
;
pCols
->
cols
=
ptr
;
for
(
i
=
oldMaxCols
;
i
<
pCols
->
maxCols
;
++
i
)
{
pCols
->
cols
[
i
].
pData
=
NULL
;
pCols
->
cols
[
i
].
dataOff
=
NULL
;
pCols
->
cols
[
i
].
pBitmap
=
NULL
;
pCols
->
cols
[
i
].
spaceSize
=
0
;
}
}
#if 0
tdResetDataCols(pCols); // redundant loop to reset len/blen to 0, already reset in following dataColInit(...)
#endif
pCols
->
numOfRows
=
0
;
pCols
->
bitmapMode
=
TSDB_BITMODE_DEFAULT
;
pCols
->
numOfCols
=
schemaNCols
(
pSchema
);
for
(
i
=
0
;
i
<
schemaNCols
(
pSchema
);
++
i
)
{
dataColInit
(
pCols
->
cols
+
i
,
schemaColAt
(
pSchema
,
i
),
pCols
->
maxPoints
);
}
return
0
;
}
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
)
{
int
i
;
if
(
pCols
)
{
if
(
pCols
->
cols
)
{
int
maxCols
=
pCols
->
maxCols
;
for
(
i
=
0
;
i
<
maxCols
;
++
i
)
{
SDataCol
*
pCol
=
&
pCols
->
cols
[
i
];
taosMemoryFreeClear
(
pCol
->
pData
);
}
taosMemoryFree
(
pCols
->
cols
);
pCols
->
cols
=
NULL
;
}
taosMemoryFree
(
pCols
);
}
return
NULL
;
}
void
tdResetDataCols
(
SDataCols
*
pCols
)
{
if
(
pCols
!=
NULL
)
{
pCols
->
numOfRows
=
0
;
pCols
->
bitmapMode
=
0
;
for
(
int
i
=
0
;
i
<
pCols
->
maxCols
;
++
i
)
{
dataColReset
(
pCols
->
cols
+
i
);
}
}
}
#endif
\ No newline at end of file
source/common/src/trow.c
浏览文件 @
34e5bad7
...
...
@@ -32,28 +32,10 @@ const uint8_t tdVTypeByte[2][3] = {{
};
// declaration
static
uint8_t
tdGetBitmapByte
(
uint8_t
byte
);
static
int32_t
tdCompareColId
(
const
void
*
arg1
,
const
void
*
arg2
);
static
uint8_t
tdGetBitmapByte
(
uint8_t
byte
);
static
int32_t
tdCompareColId
(
const
void
*
arg1
,
const
void
*
arg2
);
static
FORCE_INLINE
int32_t
compareKvRowColId
(
const
void
*
key1
,
const
void
*
key2
);
// static void dataColSetNEleNull(SDataCol *pCol, int nEle);
/**
* @brief src2 data has more priority than src1
*
* @param target
* @param src1
* @param iter1
* @param limit1
* @param src2
* @param iter2
* @param limit2
* @param tRows
* @param update
*/
static
void
tdMergeTwoDataCols
(
SDataCols
*
target
,
SDataCols
*
src1
,
int
*
iter1
,
int
limit1
,
SDataCols
*
src2
,
int
*
iter2
,
int
limit2
,
int
tRows
,
bool
update
);
// implementation
/**
* @brief Compress bitmap bytes comprised of 2-bits to counterpart of 1-bit.
...
...
@@ -287,33 +269,6 @@ void tdMergeBitmap(uint8_t *srcBitmap, int32_t nBits, uint8_t *dstBitmap) {
}
}
static
FORCE_INLINE
void
dataColSetNullAt
(
SDataCol
*
pCol
,
int
index
,
bool
setBitmap
,
int8_t
bitmapMode
)
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
[
index
]
=
pCol
->
len
;
char
*
ptr
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
);
setVardataNull
(
ptr
,
pCol
->
type
);
pCol
->
len
+=
varDataTLen
(
ptr
);
}
else
{
setNull
(
POINTER_SHIFT
(
pCol
->
pData
,
TYPE_BYTES
[
pCol
->
type
]
*
index
),
pCol
->
type
,
pCol
->
bytes
);
pCol
->
len
+=
TYPE_BYTES
[
pCol
->
type
];
}
if
(
setBitmap
)
{
tdSetBitmapValType
(
pCol
->
pBitmap
,
index
,
TD_VTYPE_NONE
,
bitmapMode
);
}
}
// static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
// if (IS_VAR_DATA_TYPE(pCol->type)) {
// pCol->len = 0;
// for (int i = 0; i < nEle; i++) {
// dataColSetNullAt(pCol, i);
// }
// } else {
// setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
// pCol->len = TYPE_BYTES[pCol->type] * nEle;
// }
// }
/**
* @brief Set bitmap area by byte preferentially and then by bit.
*
...
...
@@ -362,56 +317,6 @@ bool tdIsBitmapBlkNorm(const void *pBitmap, int32_t numOfBits, int8_t bitmapMode
return
true
;
}
static
FORCE_INLINE
void
dataColSetNoneAt
(
SDataCol
*
pCol
,
int
index
,
bool
setBitmap
,
int8_t
bitmapMode
)
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
dataOff
[
index
]
=
pCol
->
len
;
char
*
ptr
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
);
setVardataNull
(
ptr
,
pCol
->
type
);
pCol
->
len
+=
varDataTLen
(
ptr
);
}
else
{
setNull
(
POINTER_SHIFT
(
pCol
->
pData
,
TYPE_BYTES
[
pCol
->
type
]
*
index
),
pCol
->
type
,
pCol
->
bytes
);
pCol
->
len
+=
TYPE_BYTES
[
pCol
->
type
];
}
if
(
setBitmap
)
{
tdSetBitmapValType
(
pCol
->
pBitmap
,
index
,
TD_VTYPE_NONE
,
bitmapMode
);
}
}
static
void
dataColSetNEleNone
(
SDataCol
*
pCol
,
int
nEle
,
int8_t
bitmapMode
)
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pCol
->
len
=
0
;
for
(
int
i
=
0
;
i
<
nEle
;
++
i
)
{
dataColSetNoneAt
(
pCol
,
i
,
false
,
bitmapMode
);
}
}
else
{
setNullN
(
pCol
->
pData
,
pCol
->
type
,
pCol
->
bytes
,
nEle
);
pCol
->
len
=
TYPE_BYTES
[
pCol
->
type
]
*
nEle
;
}
#ifdef TD_SUPPORT_BITMAP
tdSetBitmapValTypeN
(
pCol
->
pBitmap
,
nEle
,
TD_VTYPE_NONE
,
bitmapMode
);
#endif
}
#if 0
void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) {
// TODO
}
void trbSetRowVersion(SRowBuilder *pRB, uint64_t ver) {
// TODO
}
void trbSetRowTS(SRowBuilder *pRB, TSKEY ts) {
// TODO
}
int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) {
// TODO
return 0;
}
#endif
STSRow
*
tdRowDup
(
STSRow
*
row
)
{
STSRow
*
trow
=
taosMemoryMalloc
(
TD_ROW_LEN
(
row
));
if
(
trow
==
NULL
)
return
NULL
;
...
...
@@ -420,511 +325,6 @@ STSRow *tdRowDup(STSRow *row) {
return
trow
;
}
/**
* @brief
*
* @param pCol
* @param valType
* @param val
* @param numOfRows
* @param maxPoints
* @param bitmapMode default is 0(2 bits), otherwise 1(1 bit)
* @param isMerge merge to current row
* @return int
*/
int
tdAppendValToDataCol
(
SDataCol
*
pCol
,
TDRowValT
valType
,
const
void
*
val
,
int
numOfRows
,
int
maxPoints
,
int8_t
bitmapMode
,
bool
isMerge
)
{
TASSERT
(
pCol
!=
NULL
);
// Assume that the columns not specified during insert/upsert mean None.
if
(
isAllRowsNone
(
pCol
))
{
if
(
tdValIsNone
(
valType
))
{
// all None value yet, just return
return
0
;
}
if
(
tdAllocMemForCol
(
pCol
,
maxPoints
)
<
0
)
return
-
1
;
if
(
numOfRows
>
0
)
{
// Find the first not None value, fill all previous values as None
dataColSetNEleNone
(
pCol
,
numOfRows
,
bitmapMode
);
}
}
const
void
*
value
=
val
;
if
(
!
tdValTypeIsNorm
(
valType
)
||
!
val
)
{
// TODO:
// 1. back compatibility and easy to debug with codes of 2.0 to save NULL values.
// 2. later on, considering further optimization, don't save Null/None for VarType.
value
=
getNullValue
(
pCol
->
type
);
}
if
(
!
isMerge
)
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
// set offset
pCol
->
dataOff
[
numOfRows
]
=
pCol
->
len
;
// Copy data
memcpy
(
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
),
value
,
varDataTLen
(
value
));
// Update the length
pCol
->
len
+=
varDataTLen
(
value
);
}
else
{
ASSERT
(
pCol
->
len
==
TYPE_BYTES
[
pCol
->
type
]
*
numOfRows
);
memcpy
(
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
),
value
,
pCol
->
bytes
);
pCol
->
len
+=
pCol
->
bytes
;
}
}
else
if
(
!
tdValTypeIsNone
(
valType
))
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
// keep the last offset
// discard the last var data
int32_t
lastVarLen
=
varDataTLen
(
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
dataOff
[
numOfRows
]));
pCol
->
len
-=
lastVarLen
;
// Copy data
memcpy
(
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
),
value
,
varDataTLen
(
value
));
// Update the length
pCol
->
len
+=
varDataTLen
(
value
);
}
else
{
ASSERT
(
pCol
->
len
-
TYPE_BYTES
[
pCol
->
type
]
==
TYPE_BYTES
[
pCol
->
type
]
*
numOfRows
);
memcpy
(
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
len
-
TYPE_BYTES
[
pCol
->
type
]),
value
,
pCol
->
bytes
);
}
}
#ifdef TD_SUPPORT_BITMAP
if
(
!
isMerge
||
!
tdValTypeIsNone
(
valType
))
{
tdSetBitmapValType
(
pCol
->
pBitmap
,
numOfRows
,
valType
,
bitmapMode
);
}
#endif
return
0
;
}
// internal
static
int32_t
tdAppendTpRowToDataCol
(
STSRow
*
pRow
,
STSchema
*
pSchema
,
SDataCols
*
pCols
,
bool
isMerge
)
{
#if 0
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow));
#endif
// Multi-Version rows with the same key and different versions supported
ASSERT
(
pCols
->
numOfRows
==
0
||
dataColsKeyLast
(
pCols
)
<=
TD_ROW_KEY
(
pRow
));
int
rcol
=
1
;
int
dcol
=
1
;
void
*
pBitmap
=
tdGetBitmapAddrTp
(
pRow
,
pSchema
->
flen
);
SDataCol
*
pDataCol
=
&
(
pCols
->
cols
[
0
]);
ASSERT
(
pDataCol
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
tdAppendValToDataCol
(
pDataCol
,
TD_VTYPE_NORM
,
&
pRow
->
ts
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
while
(
dcol
<
pCols
->
numOfCols
)
{
pDataCol
=
&
(
pCols
->
cols
[
dcol
]);
if
(
rcol
>=
schemaNCols
(
pSchema
))
{
tdAppendValToDataCol
(
pDataCol
,
TD_VTYPE_NULL
,
NULL
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
++
dcol
;
continue
;
}
STColumn
*
pRowCol
=
schemaColAt
(
pSchema
,
rcol
);
SCellVal
sVal
=
{
0
};
if
(
pRowCol
->
colId
==
pDataCol
->
colId
)
{
if
(
tdGetTpRowValOfCol
(
&
sVal
,
pRow
,
pBitmap
,
pRowCol
->
type
,
pRowCol
->
offset
-
sizeof
(
TSKEY
),
rcol
-
1
)
<
0
)
{
return
terrno
;
}
tdAppendValToDataCol
(
pDataCol
,
sVal
.
valType
,
sVal
.
val
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
++
dcol
;
++
rcol
;
}
else
if
(
pRowCol
->
colId
<
pDataCol
->
colId
)
{
++
rcol
;
}
else
{
tdAppendValToDataCol
(
pDataCol
,
TD_VTYPE_NULL
,
NULL
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
++
dcol
;
}
}
#if 0
++pCols->numOfRows;
#endif
return
TSDB_CODE_SUCCESS
;
}
// internal
static
int32_t
tdAppendKvRowToDataCol
(
STSRow
*
pRow
,
STSchema
*
pSchema
,
SDataCols
*
pCols
,
bool
isMerge
)
{
ASSERT
(
pCols
->
numOfRows
==
0
||
dataColsKeyLast
(
pCols
)
<
TD_ROW_KEY
(
pRow
));
int
rcol
=
0
;
int
dcol
=
1
;
int
tRowCols
=
tdRowGetNCols
(
pRow
)
-
1
;
// the primary TS key not included in kvRowColIdx part
int
tSchemaCols
=
schemaNCols
(
pSchema
)
-
1
;
void
*
pBitmap
=
tdGetBitmapAddrKv
(
pRow
,
tdRowGetNCols
(
pRow
));
SDataCol
*
pDataCol
=
&
(
pCols
->
cols
[
0
]);
ASSERT
(
pDataCol
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
tdAppendValToDataCol
(
pDataCol
,
TD_VTYPE_NORM
,
&
pRow
->
ts
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
while
(
dcol
<
pCols
->
numOfCols
)
{
pDataCol
=
&
(
pCols
->
cols
[
dcol
]);
if
(
rcol
>=
tRowCols
||
rcol
>=
tSchemaCols
)
{
tdAppendValToDataCol
(
pDataCol
,
TD_VTYPE_NULL
,
NULL
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
++
dcol
;
continue
;
}
SKvRowIdx
*
pIdx
=
tdKvRowColIdxAt
(
pRow
,
rcol
);
int16_t
colIdx
=
-
1
;
if
(
pIdx
)
{
colIdx
=
POINTER_DISTANCE
(
pIdx
,
TD_ROW_COL_IDX
(
pRow
))
/
sizeof
(
SKvRowIdx
);
}
TASSERT
(
colIdx
>=
0
);
SCellVal
sVal
=
{
0
};
if
(
pIdx
->
colId
==
pDataCol
->
colId
)
{
if
(
tdGetKvRowValOfCol
(
&
sVal
,
pRow
,
pBitmap
,
pIdx
->
offset
,
colIdx
)
<
0
)
{
return
terrno
;
}
tdAppendValToDataCol
(
pDataCol
,
sVal
.
valType
,
sVal
.
val
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
++
dcol
;
++
rcol
;
}
else
if
(
pIdx
->
colId
<
pDataCol
->
colId
)
{
++
rcol
;
}
else
{
tdAppendValToDataCol
(
pDataCol
,
TD_VTYPE_NULL
,
NULL
,
pCols
->
numOfRows
,
pCols
->
maxPoints
,
pCols
->
bitmapMode
,
isMerge
);
++
dcol
;
}
}
#if 0
++pCols->numOfRows;
#endif
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief exposed
*
* @param pRow
* @param pSchema
* @param pCols
*/
int32_t
tdAppendSTSRowToDataCol
(
STSRow
*
pRow
,
STSchema
*
pSchema
,
SDataCols
*
pCols
,
bool
isMerge
)
{
#ifdef TD_DEBUG_PRINT_TSDB_LOAD_DCOLS
printf
(
"%s:%d ts: %"
PRIi64
" sver:%d maxCols:%"
PRIi16
" nCols:%"
PRIi16
", nRows:%d
\n
"
,
__func__
,
__LINE__
,
TD_ROW_KEY
(
pRow
),
TD_ROW_SVER
(
pRow
),
pCols
->
maxCols
,
pCols
->
numOfCols
,
pCols
->
numOfRows
);
#endif
if
(
TD_IS_TP_ROW
(
pRow
))
{
return
tdAppendTpRowToDataCol
(
pRow
,
pSchema
,
pCols
,
isMerge
);
}
else
if
(
TD_IS_KV_ROW
(
pRow
))
{
return
tdAppendKvRowToDataCol
(
pRow
,
pSchema
,
pCols
,
isMerge
);
}
else
{
ASSERT
(
0
);
}
return
TSDB_CODE_SUCCESS
;
}
/**
* @brief source data has more priority than target
*
* @param target
* @param source
* @param rowsToMerge
* @param pOffset
* @param update
* @param maxVer
* @return int
*/
int
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int
rowsToMerge
,
int
*
pOffset
,
bool
update
,
TDRowVerT
maxVer
)
{
ASSERT
(
rowsToMerge
>
0
&&
rowsToMerge
<=
source
->
numOfRows
);
ASSERT
(
target
->
numOfCols
==
source
->
numOfCols
);
int
offset
=
0
;
if
(
pOffset
==
NULL
)
{
pOffset
=
&
offset
;
}
SDataCols
*
pTarget
=
NULL
;
if
((
target
->
numOfRows
==
0
)
||
(
dataColsKeyLast
(
target
)
<
dataColsKeyAtRow
(
source
,
*
pOffset
)))
{
// No overlap
ASSERT
(
target
->
numOfRows
+
rowsToMerge
<=
target
->
maxPoints
);
// TODO: filter the maxVer
TSKEY
lastKey
=
TSKEY_INITIAL_VAL
;
for
(
int
i
=
0
;
i
<
rowsToMerge
;
++
i
)
{
bool
merge
=
false
;
for
(
int
j
=
0
;
j
<
source
->
numOfCols
;
j
++
)
{
if
(
source
->
cols
[
j
].
len
>
0
||
target
->
cols
[
j
].
len
>
0
)
{
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
source
->
cols
+
j
,
i
+
(
*
pOffset
),
source
->
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
}
if
(
j
==
0
)
{
if
(
lastKey
==
*
(
TSKEY
*
)
sVal
.
val
)
{
if
(
!
update
)
{
break
;
}
merge
=
true
;
}
else
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
target
->
numOfRows
;
}
lastKey
=
*
(
TSKEY
*
)
sVal
.
val
;
}
if
(
i
==
0
)
{
(
target
->
cols
+
j
)
->
bitmap
=
(
source
->
cols
+
j
)
->
bitmap
;
}
tdAppendValToDataCol
(
target
->
cols
+
j
,
sVal
.
valType
,
sVal
.
val
,
target
->
numOfRows
,
target
->
maxPoints
,
target
->
bitmapMode
,
merge
);
}
}
}
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
target
->
numOfRows
;
}
(
*
pOffset
)
+=
rowsToMerge
;
}
else
{
pTarget
=
tdDupDataCols
(
target
,
true
);
if
(
pTarget
==
NULL
)
goto
_err
;
int
iter1
=
0
;
tdMergeTwoDataCols
(
target
,
pTarget
,
&
iter1
,
pTarget
->
numOfRows
,
source
,
pOffset
,
source
->
numOfRows
,
pTarget
->
numOfRows
+
rowsToMerge
,
update
);
}
tdFreeDataCols
(
pTarget
);
return
0
;
_err:
tdFreeDataCols
(
pTarget
);
return
-
1
;
}
static
void
tdAppendValToDataCols
(
SDataCols
*
target
,
SDataCols
*
src
,
int
iter
,
bool
isMerge
)
{
for
(
int
i
=
0
;
i
<
src
->
numOfCols
;
++
i
)
{
ASSERT
(
target
->
cols
[
i
].
type
==
src
->
cols
[
i
].
type
);
if
(
src
->
cols
[
i
].
len
>
0
||
target
->
cols
[
i
].
len
>
0
)
{
SCellVal
sVal
=
{
0
};
if
(
tdGetColDataOfRow
(
&
sVal
,
src
->
cols
+
i
,
iter
,
src
->
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
}
if
(
isMerge
)
{
if
(
!
tdValTypeIsNone
(
sVal
.
valType
))
{
tdAppendValToDataCol
(
&
(
target
->
cols
[
i
]),
sVal
.
valType
,
sVal
.
val
,
target
->
numOfRows
,
target
->
maxPoints
,
target
->
bitmapMode
,
isMerge
);
}
else
{
// Keep the origin value for None
}
}
else
{
tdAppendValToDataCol
(
&
(
target
->
cols
[
i
]),
sVal
.
valType
,
sVal
.
val
,
target
->
numOfRows
,
target
->
maxPoints
,
target
->
bitmapMode
,
isMerge
);
}
}
}
}
/**
* @brief src2 data has more priority than src1
*
* @param target
* @param src1
* @param iter1
* @param limit1
* @param src2
* @param iter2
* @param limit2
* @param tRows
* @param update
*/
static
void
tdMergeTwoDataCols
(
SDataCols
*
target
,
SDataCols
*
src1
,
int
*
iter1
,
int
limit1
,
SDataCols
*
src2
,
int
*
iter2
,
int
limit2
,
int
tRows
,
bool
update
)
{
tdResetDataCols
(
target
);
target
->
bitmapMode
=
src1
->
bitmapMode
;
ASSERT
(
limit1
<=
src1
->
numOfRows
&&
limit2
<=
src2
->
numOfRows
);
int32_t
nRows
=
0
;
// TODO: filter the maxVer
// TODO: handle the delete function
TSKEY
lastKey
=
TSKEY_INITIAL_VAL
;
while
(
nRows
<
tRows
)
{
if
(
*
iter1
>=
limit1
&&
*
iter2
>=
limit2
)
break
;
TSKEY
key1
=
(
*
iter1
>=
limit1
)
?
INT64_MAX
:
dataColsKeyAt
(
src1
,
*
iter1
);
// TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1);
TSKEY
key2
=
(
*
iter2
>=
limit2
)
?
INT64_MAX
:
dataColsKeyAt
(
src2
,
*
iter2
);
// TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2);
// ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1)));
if
(
key1
<=
key2
)
{
// select key1 if not delete
if
(
update
&&
(
lastKey
==
key1
))
{
tdAppendValToDataCols
(
target
,
src1
,
*
iter1
,
true
);
}
else
if
(
lastKey
!=
key1
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
target
->
numOfRows
;
}
tdAppendValToDataCols
(
target
,
src1
,
*
iter1
,
false
);
}
++
nRows
;
++
(
*
iter1
);
lastKey
=
key1
;
}
else
{
// use key2 if not deleted
// TODO: handle the delete function
if
(
update
&&
(
lastKey
==
key2
))
{
tdAppendValToDataCols
(
target
,
src2
,
*
iter2
,
true
);
}
else
if
(
lastKey
!=
key2
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
target
->
numOfRows
;
}
tdAppendValToDataCols
(
target
,
src2
,
*
iter2
,
false
);
}
++
nRows
;
++
(
*
iter2
);
lastKey
=
key2
;
}
ASSERT
(
target
->
numOfRows
<=
target
->
maxPoints
-
1
);
}
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
target
->
numOfRows
;
}
}
STSRow
*
mergeTwoRows
(
void
*
buffer
,
STSRow
*
row1
,
STSRow
*
row2
,
STSchema
*
pSchema1
,
STSchema
*
pSchema2
)
{
#if 0
ASSERT(TD_ROW_KEY(row1) == TD_ROW_KEY(row2));
ASSERT(schemaVersion(pSchema1) == TD_ROW_SVER(row1));
ASSERT(schemaVersion(pSchema2) == TD_ROW_SVER(row2));
ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2));
#endif
#if 0
SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo));
if (stashRow == NULL) {
return NULL;
}
STSRow pRow = buffer;
STpRow dataRow = memRowDataBody(pRow);
memRowSetType(pRow, SMEM_ROW_DATA);
dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));
TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
int32_t i = 0; // row1
int32_t j = 0; // row2
int32_t nCols1 = schemaNCols(pSchema1);
int32_t nCols2 = schemaNCols(pSchema2);
SColInfo colInfo = {0};
int32_t kvIdx1 = 0, kvIdx2 = 0;
while (i < nCols1) {
STColumn *pCol = schemaColAt(pSchema1, i);
void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1);
// if val1 != NULL, use val1;
if (val1 != NULL && !isNull(val1, pCol->type)) {
tdAppendColVal(dataRow, val1, pCol->type, pCol->offset);
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type);
setSColInfo(&colInfo, pCol->colId, pCol->type, val1);
taosArrayPush(stashRow, &colInfo);
++i; // next col
continue;
}
void *val2 = NULL;
while (j < nCols2) {
STColumn *tCol = schemaColAt(pSchema2, j);
if (tCol->colId < pCol->colId) {
++j;
continue;
}
if (tCol->colId == pCol->colId) {
val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2);
} else if (tCol->colId > pCol->colId) {
// set NULL
}
break;
} // end of while(j<nCols2)
if (val2 == NULL) {
val2 = (void *)getNullValue(pCol->type);
}
tdAppendColVal(dataRow, val2, pCol->type, pCol->offset);
if (!isNull(val2, pCol->type)) {
kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type);
setSColInfo(&colInfo, pCol->colId, pCol->type, val2);
taosArrayPush(stashRow, &colInfo);
}
++i; // next col
}
dataLen = TD_ROW_LEN(pRow);
if (kvLen < dataLen) {
// scan stashRow and generate SKVRow
memset(buffer, 0, sizeof(dataLen));
STSRow tRow = buffer;
memRowSetType(tRow, SMEM_ROW_KV);
SKVRow kvRow = (SKVRow)memRowKvBody(tRow);
int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols));
kvRowSetNCols(kvRow, nKvNCols);
memRowSetKvVersion(tRow, pSchema1->version);
int32_t toffset = 0;
int16_t k;
for (k = 0; k < nKvNCols; ++k) {
SColInfo *pColInfo = taosArrayGet(stashRow, k);
tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
toffset += sizeof(SColIdx);
}
ASSERT(kvLen == TD_ROW_LEN(tRow));
}
taosArrayDestroy(stashRow);
return buffer;
#endif
return
NULL
;
}
SDataCols
*
tdDupDataCols
(
SDataCols
*
pDataCols
,
bool
keepData
)
{
SDataCols
*
pRet
=
tdNewDataCols
(
pDataCols
->
maxCols
,
pDataCols
->
maxPoints
);
if
(
pRet
==
NULL
)
return
NULL
;
pRet
->
numOfCols
=
pDataCols
->
numOfCols
;
pRet
->
bitmapMode
=
pDataCols
->
bitmapMode
;
pRet
->
sversion
=
pDataCols
->
sversion
;
if
(
keepData
)
pRet
->
numOfRows
=
pDataCols
->
numOfRows
;
for
(
int
i
=
0
;
i
<
pDataCols
->
numOfCols
;
++
i
)
{
pRet
->
cols
[
i
].
type
=
pDataCols
->
cols
[
i
].
type
;
pRet
->
cols
[
i
].
bitmap
=
pDataCols
->
cols
[
i
].
bitmap
;
pRet
->
cols
[
i
].
colId
=
pDataCols
->
cols
[
i
].
colId
;
pRet
->
cols
[
i
].
bytes
=
pDataCols
->
cols
[
i
].
bytes
;
pRet
->
cols
[
i
].
offset
=
pDataCols
->
cols
[
i
].
offset
;
if
(
keepData
)
{
if
(
pDataCols
->
cols
[
i
].
len
>
0
)
{
if
(
tdAllocMemForCol
(
&
pRet
->
cols
[
i
],
pRet
->
maxPoints
)
<
0
)
{
tdFreeDataCols
(
pRet
);
return
NULL
;
}
pRet
->
cols
[
i
].
len
=
pDataCols
->
cols
[
i
].
len
;
memcpy
(
pRet
->
cols
[
i
].
pData
,
pDataCols
->
cols
[
i
].
pData
,
pDataCols
->
cols
[
i
].
len
);
if
(
IS_VAR_DATA_TYPE
(
pRet
->
cols
[
i
].
type
))
{
int
dataOffSize
=
sizeof
(
VarDataOffsetT
)
*
pDataCols
->
maxPoints
;
memcpy
(
pRet
->
cols
[
i
].
dataOff
,
pDataCols
->
cols
[
i
].
dataOff
,
dataOffSize
);
}
if
(
!
TD_COL_ROWS_NORM
(
pRet
->
cols
+
i
))
{
memcpy
(
pRet
->
cols
[
i
].
pBitmap
,
pDataCols
->
cols
[
i
].
pBitmap
,
TD_BITMAP_BYTES
(
pDataCols
->
numOfRows
));
}
}
}
}
return
pRet
;
}
void
tdSRowPrint
(
STSRow
*
row
,
STSchema
*
pSchema
,
const
char
*
tag
)
{
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pSchema
);
...
...
@@ -1020,32 +420,6 @@ void tdSCellValPrint(SCellVal *pVal, int8_t colType) {
}
}
int32_t
dataColGetNEleLen
(
SDataCol
*
pDataCol
,
int32_t
rows
,
int8_t
bitmapMode
)
{
ASSERT
(
rows
>
0
);
int32_t
result
=
0
;
if
(
IS_VAR_DATA_TYPE
(
pDataCol
->
type
))
{
result
+=
pDataCol
->
dataOff
[
rows
-
1
];
SCellVal
val
=
{
0
};
if
(
tdGetColDataOfRow
(
&
val
,
pDataCol
,
rows
-
1
,
bitmapMode
)
<
0
)
{
TASSERT
(
0
);
}
// Currently, count the varDataTLen in of Null/None cols considering back compatibility test for 2.4
result
+=
varDataTLen
(
val
.
val
);
// TODO: later on, don't save Null/None for VarDataT for 3.0
// if (tdValTypeIsNorm(val.valType)) {
// result += varDataTLen(val.val);
// }
}
else
{
result
+=
TYPE_BYTES
[
pDataCol
->
type
]
*
rows
;
}
ASSERT
(
pDataCol
->
len
==
result
);
return
result
;
}
bool
tdSKvRowGetVal
(
STSRow
*
pRow
,
col_id_t
colId
,
col_id_t
colIdx
,
SCellVal
*
pVal
)
{
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
tdRowSetVal
(
pVal
,
TD_VTYPE_NORM
,
TD_ROW_KEY_ADDR
(
pRow
));
...
...
@@ -1082,40 +456,6 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
return
true
;
}
int32_t
tdGetColDataOfRow
(
SCellVal
*
pVal
,
SDataCol
*
pCol
,
int32_t
row
,
int8_t
bitmapMode
)
{
if
(
isAllRowsNone
(
pCol
))
{
pVal
->
valType
=
TD_VTYPE_NONE
;
#ifdef TD_SUPPORT_READ2
pVal
->
val
=
(
void
*
)
getNullValue
(
pCol
->
type
);
#else
pVal
->
val
=
NULL
;
#endif
return
TSDB_CODE_SUCCESS
;
}
if
(
TD_COL_ROWS_NORM
(
pCol
))
{
pVal
->
valType
=
TD_VTYPE_NORM
;
}
else
if
(
tdGetBitmapValType
(
pCol
->
pBitmap
,
row
,
&
(
pVal
->
valType
),
bitmapMode
)
<
0
)
{
return
terrno
;
}
if
(
tdValTypeIsNorm
(
pVal
->
valType
))
{
if
(
IS_VAR_DATA_TYPE
(
pCol
->
type
))
{
pVal
->
val
=
POINTER_SHIFT
(
pCol
->
pData
,
pCol
->
dataOff
[
row
]);
}
else
{
pVal
->
val
=
POINTER_SHIFT
(
pCol
->
pData
,
TYPE_BYTES
[
pCol
->
type
]
*
row
);
}
}
else
{
pVal
->
valType
=
TD_VTYPE_NULL
;
#ifdef TD_SUPPORT_READ2
pVal
->
val
=
(
void
*
)
getNullValue
(
pCol
->
type
);
#else
pVal
->
val
=
NULL
;
#endif
}
return
TSDB_CODE_SUCCESS
;
}
bool
tdSTSRowIterNext
(
STSRowIter
*
pIter
,
col_id_t
colId
,
col_type_t
colType
,
SCellVal
*
pVal
)
{
if
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
pVal
->
val
=
&
pIter
->
pRow
->
ts
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录