Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
1800c2e8
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1800c2e8
编写于
7月 19, 2021
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support payload type for both prepare and non-prepare mode
上级
bd384300
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
79 addition
and
246 deletion
+79
-246
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+9
-0
src/client/src/tscParseInsert.c
src/client/src/tscParseInsert.c
+1
-3
src/client/src/tscPrepare.c
src/client/src/tscPrepare.c
+10
-215
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+58
-28
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
1800c2e8
...
...
@@ -108,6 +108,7 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
int32_t
tscCreateDataBlock
(
size_t
initialSize
,
int32_t
rowSize
,
int32_t
startOffset
,
SName
*
name
,
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
);
void
tscDestroyDataBlock
(
STableDataBlocks
*
pDataBlock
,
bool
removeMeta
);
void
tscSortRemoveDataBlockDupRowsRaw
(
STableDataBlocks
*
dataBuf
);
int
tscSortRemoveDataBlockDupRows
(
STableDataBlocks
*
dataBuf
,
SBlockKeyInfo
*
pBlkKeyInfo
);
void
tscDestroyBoundColumnInfo
(
SParsedDataColInfo
*
pColInfo
);
...
...
src/client/inc/tsclient.h
浏览文件 @
1800c2e8
...
...
@@ -159,6 +159,7 @@ typedef struct SInsertStatementParam {
SHashObj
*
pTableBlockHashList
;
// data block for each table
SArray
*
pDataBlocks
;
// SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t
schemaAttached
;
// denote if submit block is built with table schema or not
uint8_t
payloadType
;
// EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
STagData
tagData
;
// NOTE: pTagData->data is used as a variant length array
int32_t
batchSize
;
// for parameter ('?') binding and batch processing
...
...
@@ -170,6 +171,14 @@ typedef struct SInsertStatementParam {
char
*
sql
;
// current sql statement position
}
SInsertStatementParam
;
typedef
enum
{
PAYLOAD_TYPE_KV
=
0
,
PAYLOAD_TYPE_RAW
=
1
,
}
EPayloadType
;
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
// TODO extract sql parser supporter
typedef
struct
{
int
command
;
...
...
src/client/src/tscParseInsert.c
浏览文件 @
1800c2e8
...
...
@@ -1069,9 +1069,8 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta,
}
}
#if 0
// data block is disordered, sort it in ascending order
static void tscSortRemoveDataBlockDupRowsOld
(STableDataBlocks *dataBuf) {
void
tscSortRemoveDataBlockDupRowsRaw
(
STableDataBlocks
*
dataBuf
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
dataBuf
->
pData
;
// size is less than the total size, since duplicated rows may be removed yet.
...
...
@@ -1114,7 +1113,6 @@ static void tscSortRemoveDataBlockDupRowsOld(STableDataBlocks *dataBuf) {
dataBuf
->
prevTS
=
INT64_MIN
;
}
#endif
// data block is disordered, sort it in ascending order
int
tscSortRemoveDataBlockDupRows
(
STableDataBlocks
*
dataBuf
,
SBlockKeyInfo
*
pBlkKeyInfo
)
{
...
...
src/client/src/tscPrepare.c
浏览文件 @
1800c2e8
...
...
@@ -291,7 +291,6 @@ static char* normalStmtBuildSql(STscStmt* stmt) {
return
taosStringBuilderGetResult
(
&
sb
,
NULL
);
}
#if 0
static
int
fillColumnsNull
(
STableDataBlocks
*
pBlock
,
int32_t
rowNum
)
{
SParsedDataColInfo
*
spd
=
&
pBlock
->
boundColumnInfo
;
int32_t
offset
=
0
;
...
...
@@ -319,129 +318,8 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
return
TSDB_CODE_SUCCESS
;
}
#endif
/**
* input:
* - schema:
* - payload:
* - spd:
* output:
* - pBlock with data block replaced by K-V format
*/
static
int
refactorPayload
(
STableDataBlocks
*
pBlock
,
int32_t
rowNum
)
{
SParsedDataColInfo
*
spd
=
&
pBlock
->
boundColumnInfo
;
SSchema
*
schema
=
(
SSchema
*
)
pBlock
->
pTableMeta
->
schema
;
SMemRowHelper
*
pHelper
=
&
pBlock
->
rowHelper
;
STableMeta
*
pTableMeta
=
pBlock
->
pTableMeta
;
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
int
code
=
TSDB_CODE_SUCCESS
;
int32_t
extendedRowSize
=
getExtendedRowSize
(
&
tinfo
);
TDRowTLenT
destPayloadSize
=
sizeof
(
SSubmitBlk
);
ASSERT
(
pHelper
->
allNullLen
>=
8
);
TDRowTLenT
destAllocSize
=
sizeof
(
SSubmitBlk
)
+
rowNum
*
extendedRowSize
;
SSubmitBlk
*
pDestBlock
=
tcalloc
(
destAllocSize
,
1
);
if
(
pDestBlock
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
memcpy
(
pDestBlock
,
pBlock
->
pData
,
sizeof
(
SSubmitBlk
));
char
*
destPayload
=
(
char
*
)
pDestBlock
+
sizeof
(
SSubmitBlk
);
char
*
srcPayload
=
(
char
*
)
pBlock
->
pData
+
sizeof
(
SSubmitBlk
);
for
(
int
n
=
0
;
n
<
rowNum
;
++
n
)
{
payloadSetNCols
(
destPayload
,
spd
->
numOfBound
);
TDRowTLenT
dataRowLen
=
pHelper
->
allNullLen
;
TDRowTLenT
kvRowLen
=
TD_MEM_ROW_KV_VER_SIZE
+
sizeof
(
SColIdx
)
*
spd
->
numOfBound
;
TDRowTLenT
payloadValOffset
=
payloadValuesOffset
(
destPayload
);
// rely on payloadNCols
TDRowLenT
colValOffset
=
0
;
char
*
kvPrimaryKeyStart
=
destPayload
+
PAYLOAD_HEADER_LEN
;
// primaryKey in 1st column tuple
char
*
kvStart
=
kvPrimaryKeyStart
+
PAYLOAD_COL_HEAD_LEN
;
// the column tuple behind the primaryKey
for
(
int32_t
i
=
0
;
i
<
spd
->
numOfBound
;
++
i
)
{
int32_t
colIndex
=
spd
->
boundedColumns
[
i
];
ASSERT
(
spd
->
cols
[
colIndex
].
hasVal
);
char
*
start
=
srcPayload
+
spd
->
cols
[
colIndex
].
offset
;
SSchema
*
pSchema
=
&
schema
[
colIndex
];
// get colId here
bool
isPrimaryKey
=
(
colIndex
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
// the primary key locates in 1st column
if
(
!
IS_DATA_COL_ORDERED
(
spd
->
orderStatus
))
{
ASSERT
(
spd
->
colIdxInfo
!=
NULL
);
if
(
!
isPrimaryKey
)
{
kvStart
=
POINTER_SHIFT
(
kvPrimaryKeyStart
,
spd
->
colIdxInfo
[
i
].
finalIdx
*
PAYLOAD_COL_HEAD_LEN
);
}
else
{
ASSERT
(
spd
->
colIdxInfo
[
i
].
finalIdx
==
0
);
}
}
if
(
isPrimaryKey
)
{
payloadColSetId
(
kvPrimaryKeyStart
,
pSchema
->
colId
);
payloadColSetType
(
kvPrimaryKeyStart
,
pSchema
->
type
);
payloadColSetOffset
(
kvPrimaryKeyStart
,
colValOffset
);
memcpy
(
POINTER_SHIFT
(
destPayload
,
payloadValOffset
+
colValOffset
),
start
,
TYPE_BYTES
[
pSchema
->
type
]);
colValOffset
+=
TYPE_BYTES
[
pSchema
->
type
];
kvRowLen
+=
TYPE_BYTES
[
pSchema
->
type
];
}
else
{
payloadColSetId
(
kvStart
,
pSchema
->
colId
);
payloadColSetType
(
kvStart
,
pSchema
->
type
);
payloadColSetOffset
(
kvStart
,
colValOffset
);
if
(
IS_VAR_DATA_TYPE
(
pSchema
->
type
))
{
varDataCopy
(
POINTER_SHIFT
(
destPayload
,
payloadValOffset
+
colValOffset
),
start
);
colValOffset
+=
varDataTLen
(
start
);
kvRowLen
+=
varDataTLen
(
start
);
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_BINARY
)
{
dataRowLen
+=
(
varDataLen
(
start
)
-
CHAR_BYTES
);
}
else
if
(
pSchema
->
type
==
TSDB_DATA_TYPE_NCHAR
)
{
dataRowLen
+=
(
varDataLen
(
start
)
-
TSDB_NCHAR_SIZE
);
}
else
{
ASSERT
(
0
);
}
}
else
{
memcpy
(
POINTER_SHIFT
(
destPayload
,
payloadValOffset
+
colValOffset
),
start
,
TYPE_BYTES
[
pSchema
->
type
]);
colValOffset
+=
TYPE_BYTES
[
pSchema
->
type
];
kvRowLen
+=
TYPE_BYTES
[
pSchema
->
type
];
}
if
(
IS_DATA_COL_ORDERED
(
spd
->
orderStatus
))
{
kvStart
+=
PAYLOAD_COL_HEAD_LEN
;
// move to next column
}
}
}
// end of column
if
(
kvRowLen
<
dataRowLen
)
{
payloadSetType
(
destPayload
,
SMEM_ROW_KV
);
}
else
{
payloadSetType
(
destPayload
,
SMEM_ROW_DATA
);
}
ASSERT
(
colValOffset
<=
TSDB_MAX_BYTES_PER_ROW
);
TDRowTLenT
len
=
payloadValOffset
+
colValOffset
;
payloadSetTLen
(
destPayload
,
len
);
// next loop
srcPayload
+=
pBlock
->
rowSize
;
destPayload
+=
len
;
destPayloadSize
+=
len
;
}
// end of row
ASSERT
(
destPayloadSize
<=
destAllocSize
);
tfree
(
pBlock
->
pData
);
pBlock
->
pData
=
(
char
*
)
pDestBlock
;
pBlock
->
nAllocSize
=
destAllocSize
;
pBlock
->
size
=
destPayloadSize
;
return
code
;
}
#if 0
int32_t
fillTablesColumnsNull
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -464,98 +342,17 @@ int32_t fillTablesColumnsNull(SSqlObj* pSql) {
return
TSDB_CODE_SUCCESS
;
}
#endif
/**
* check and sort
*/
static
int
initPayloadEnv
(
STableDataBlocks
*
pBlock
,
int32_t
rowNum
)
{
SParsedDataColInfo
*
spd
=
&
pBlock
->
boundColumnInfo
;
if
(
spd
->
orderStatus
!=
ORDER_STATUS_UNKNOWN
)
{
return
TSDB_CODE_SUCCESS
;
}
bool
isOrdered
=
true
;
int32_t
lastColIdx
=
-
1
;
for
(
int32_t
i
=
0
;
i
<
spd
->
numOfBound
;
++
i
)
{
ASSERT
(
spd
->
cols
[
i
].
hasVal
);
int32_t
colIdx
=
spd
->
boundedColumns
[
i
];
if
(
isOrdered
)
{
if
(
lastColIdx
>
colIdx
)
{
isOrdered
=
false
;
break
;
}
else
{
lastColIdx
=
colIdx
;
}
}
}
spd
->
orderStatus
=
isOrdered
?
ORDER_STATUS_ORDERED
:
ORDER_STATUS_DISORDERED
;
if
(
isOrdered
)
{
spd
->
colIdxInfo
=
NULL
;
}
else
{
spd
->
colIdxInfo
=
calloc
(
spd
->
numOfBound
,
sizeof
(
SBoundIdxInfo
));
if
(
spd
->
colIdxInfo
==
NULL
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
SBoundIdxInfo
*
pColIdx
=
spd
->
colIdxInfo
;
for
(
uint16_t
i
=
0
;
i
<
spd
->
numOfBound
;
++
i
)
{
pColIdx
[
i
].
schemaColIdx
=
(
uint16_t
)
spd
->
boundedColumns
[
i
];
pColIdx
[
i
].
boundIdx
=
i
;
}
qsort
(
pColIdx
,
spd
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
schemaIdxCompar
);
for
(
uint16_t
i
=
0
;
i
<
spd
->
numOfBound
;
++
i
)
{
pColIdx
[
i
].
finalIdx
=
i
;
}
qsort
(
pColIdx
,
spd
->
numOfBound
,
sizeof
(
SBoundIdxInfo
),
boundIdxCompar
);
}
return
TSDB_CODE_SUCCESS
;
}
/**
* Refactor the raw payload structure to K-V format as the in tsParseOneRow()
*/
int32_t
fillTablesPayload
(
SSqlObj
*
pSql
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
int
code
=
TSDB_CODE_SUCCESS
;
STableDataBlocks
**
p
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
NULL
);
STableDataBlocks
*
pOneTableBlock
=
*
p
;
while
(
pOneTableBlock
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
if
(
pBlocks
->
numOfRows
>
0
)
{
initSMemRowHelper
(
&
pOneTableBlock
->
rowHelper
,
tscGetTableSchema
(
pOneTableBlock
->
pTableMeta
),
tscGetNumOfColumns
(
pOneTableBlock
->
pTableMeta
),
0
);
if
((
code
=
initPayloadEnv
(
pOneTableBlock
,
pBlocks
->
numOfRows
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
((
code
=
refactorPayload
(
pOneTableBlock
,
pBlocks
->
numOfRows
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
};
}
p
=
taosHashIterate
(
pCmd
->
insertParam
.
pTableBlockHashList
,
p
);
if
(
p
==
NULL
)
{
break
;
}
pOneTableBlock
=
*
p
;
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static
int
doBindParam
(
STableDataBlocks
*
pBlock
,
char
*
data
,
SParamInfo
*
param
,
TAOS_BIND
*
bind
,
int32_t
colNum
)
{
if
(
bind
->
is_null
!=
NULL
&&
*
(
bind
->
is_null
))
{
setNull
(
data
+
param
->
offset
,
param
->
type
,
param
->
bytes
);
return
TSDB_CODE_SUCCESS
;
}
return
code
;
}
////////////////////////////////////////////////////////////////////////////////
// functions for insertion statement preparation
static
int
doBindParam
(
STableDataBlocks
*
pBlock
,
char
*
data
,
SParamInfo
*
param
,
TAOS_BIND
*
bind
,
int32_t
colNum
)
{
if
(
bind
->
is_null
!=
NULL
&&
*
(
bind
->
is_null
))
{
setNull
(
data
+
param
->
offset
,
param
->
type
,
param
->
bytes
);
return
TSDB_CODE_SUCCESS
;
}
#if 0
if (0) {
// allow user bind param data with different type
...
...
@@ -1309,12 +1106,9 @@ static int insertStmtExecute(STscStmt* stmt) {
pBlk
->
uid
=
pTableMeta
->
id
.
uid
;
pBlk
->
tid
=
pTableMeta
->
id
.
tid
;
int
code
=
fillTablesPayload
(
stmt
->
pSql
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
fillTablesColumnsNull
(
stmt
->
pSql
);
code
=
tscMergeTableDataBlocks
(
&
stmt
->
pSql
->
cmd
.
insertParam
,
false
);
int
code
=
tscMergeTableDataBlocks
(
&
stmt
->
pSql
->
cmd
.
insertParam
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1391,7 +1185,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return
TSDB_CODE_TSC_APP_ERROR
;
}
fillTables
Payload
(
pStmt
->
pSql
);
fillTables
ColumnsNull
(
pStmt
->
pSql
);
if
((
code
=
tscMergeTableDataBlocks
(
&
pStmt
->
pSql
->
cmd
.
insertParam
,
false
))
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -2012,6 +1806,7 @@ int taos_stmt_execute(TAOS_STMT* stmt) {
pStmt
->
last
=
STMT_EXECUTE
;
pStmt
->
pSql
->
cmd
.
insertParam
.
payloadType
=
PAYLOAD_TYPE_RAW
;
if
(
pStmt
->
multiTbInsert
)
{
ret
=
insertBatchStmtExecute
(
pStmt
);
}
else
{
...
...
src/client/src/tscUtil.c
浏览文件 @
1800c2e8
...
...
@@ -1860,7 +1860,8 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
}
// Erase the empty space reserved for binary data
static
int
trimDataBlock
(
void
*
pDataBlock
,
STableDataBlocks
*
pTableDataBlock
,
bool
includeSchema
,
SBlockKeyTuple
*
blkKeyTuple
)
{
static
int
trimDataBlock
(
void
*
pDataBlock
,
STableDataBlocks
*
pTableDataBlock
,
SInsertStatementParam
*
insertParam
,
SBlockKeyTuple
*
blkKeyTuple
)
{
// TODO: optimize this function, handle the case while binary is not presented
STableMeta
*
pTableMeta
=
pTableDataBlock
->
pTableMeta
;
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMeta
);
...
...
@@ -1873,7 +1874,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
int32_t
flen
=
0
;
// original total length of row
// schema needs to be included into the submit data block
if
(
in
cludeSchema
)
{
if
(
in
sertParam
->
schemaAttached
)
{
int32_t
numOfCols
=
tscGetNumOfColumns
(
pTableDataBlock
->
pTableMeta
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
STColumn
*
pCol
=
(
STColumn
*
)
pDataBlock
;
...
...
@@ -1900,18 +1901,38 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
pBlock
->
dataLen
=
0
;
int32_t
numOfRows
=
htons
(
pBlock
->
numOfRows
);
SMemRowBuilder
rowBuilder
;
rowBuilder
.
pSchema
=
pSchema
;
rowBuilder
.
sversion
=
pTableMeta
->
sversion
;
rowBuilder
.
flen
=
flen
;
rowBuilder
.
nCols
=
tinfo
.
numOfColumns
;
rowBuilder
.
pDataBlock
=
pDataBlock
;
rowBuilder
.
pSubmitBlk
=
pBlock
;
rowBuilder
.
buf
=
p
;
if
(
IS_RAW_PAYLOAD
(
insertParam
->
payloadType
))
{
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
SMemRow
memRow
=
(
SMemRow
)
pDataBlock
;
memRowSetType
(
memRow
,
SMEM_ROW_DATA
);
SDataRow
trow
=
memRowDataBody
(
memRow
);
dataRowSetLen
(
trow
,
(
uint16_t
)(
TD_DATA_ROW_HEAD_SIZE
+
flen
));
dataRowSetVersion
(
trow
,
pTableMeta
->
sversion
);
int
toffset
=
0
;
for
(
int32_t
j
=
0
;
j
<
tinfo
.
numOfColumns
;
j
++
)
{
tdAppendColVal
(
trow
,
p
,
pSchema
[
j
].
type
,
toffset
);
toffset
+=
TYPE_BYTES
[
pSchema
[
j
].
type
];
p
+=
pSchema
[
j
].
bytes
;
}
pDataBlock
=
(
char
*
)
pDataBlock
+
memRowTLen
(
memRow
);
pBlock
->
dataLen
+=
memRowTLen
(
memRow
);
}
}
else
{
SMemRowBuilder
rowBuilder
;
rowBuilder
.
pSchema
=
pSchema
;
rowBuilder
.
sversion
=
pTableMeta
->
sversion
;
rowBuilder
.
flen
=
flen
;
rowBuilder
.
nCols
=
tinfo
.
numOfColumns
;
rowBuilder
.
pDataBlock
=
pDataBlock
;
rowBuilder
.
pSubmitBlk
=
pBlock
;
rowBuilder
.
buf
=
p
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
rowBuilder
.
buf
=
(
blkKeyTuple
+
i
)
->
payloadAddr
;
tdGenMemRowFromBuilder
(
&
rowBuilder
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
rowBuilder
.
buf
=
(
blkKeyTuple
+
i
)
->
payloadAddr
;
tdGenMemRowFromBuilder
(
&
rowBuilder
);
}
}
int32_t
len
=
pBlock
->
dataLen
+
pBlock
->
schemaLen
;
...
...
@@ -1959,6 +1980,7 @@ static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeB
int32_t
tscMergeTableDataBlocks
(
SInsertStatementParam
*
pInsertParam
,
bool
freeBlockMap
)
{
const
int
INSERT_HEAD_SIZE
=
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
);
int
code
=
0
;
bool
isRawPayload
=
IS_RAW_PAYLOAD
(
pInsertParam
->
payloadType
);
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
SArray
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
...
...
@@ -1967,7 +1989,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
STableDataBlocks
*
pOneTableBlock
=
*
p
;
SBlockKeyInfo
blkKeyInfo
=
{
0
};
// share by pOneTableBlock
while
(
pOneTableBlock
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
if
(
pBlocks
->
numOfRows
>
0
)
{
...
...
@@ -2008,21 +2030,29 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
}
}
if
((
code
=
tscSortRemoveDataBlockDupRows
(
pOneTableBlock
,
&
blkKeyInfo
))
!=
0
)
{
taosHashCleanup
(
pVnodeDataBlockHashList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
tfree
(
dataBuf
->
pData
);
tfree
(
blkKeyInfo
.
pKeyTuple
);
return
code
;
}
ASSERT
(
blkKeyInfo
.
pKeyTuple
!=
NULL
&&
pBlocks
->
numOfRows
>
0
);
if
(
isRawPayload
)
{
tscSortRemoveDataBlockDupRowsRaw
(
pOneTableBlock
);
char
*
ekey
=
(
char
*
)
pBlocks
->
data
+
pOneTableBlock
->
rowSize
*
(
pBlocks
->
numOfRows
-
1
);
SBlockKeyTuple
*
pLastKeyTuple
=
blkKeyInfo
.
pKeyTuple
+
pBlocks
->
numOfRows
-
1
;
tscDebug
(
"0x%"
PRIx64
" name:%s, tid:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pInsertParam
->
objectId
,
tNameGetTableName
(
&
pOneTableBlock
->
tableName
),
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
blkKeyInfo
.
pKeyTuple
->
skey
,
pLastKeyTuple
->
skey
);
tscDebug
(
"0x%"
PRIx64
" name:%s, tid:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pInsertParam
->
objectId
,
tNameGetTableName
(
&
pOneTableBlock
->
tableName
),
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
GET_INT64_VAL
(
pBlocks
->
data
),
GET_INT64_VAL
(
ekey
));
}
else
{
if
((
code
=
tscSortRemoveDataBlockDupRows
(
pOneTableBlock
,
&
blkKeyInfo
))
!=
0
)
{
taosHashCleanup
(
pVnodeDataBlockHashList
);
tscDestroyBlockArrayList
(
pVnodeDataBlockList
);
tfree
(
dataBuf
->
pData
);
tfree
(
blkKeyInfo
.
pKeyTuple
);
return
code
;
}
ASSERT
(
blkKeyInfo
.
pKeyTuple
!=
NULL
&&
pBlocks
->
numOfRows
>
0
);
SBlockKeyTuple
*
pLastKeyTuple
=
blkKeyInfo
.
pKeyTuple
+
pBlocks
->
numOfRows
-
1
;
tscDebug
(
"0x%"
PRIx64
" name:%s, tid:%d rows:%d sversion:%d skey:%"
PRId64
", ekey:%"
PRId64
,
pInsertParam
->
objectId
,
tNameGetTableName
(
&
pOneTableBlock
->
tableName
),
pBlocks
->
tid
,
pBlocks
->
numOfRows
,
pBlocks
->
sversion
,
blkKeyInfo
.
pKeyTuple
->
skey
,
pLastKeyTuple
->
skey
);
}
int32_t
len
=
pBlocks
->
numOfRows
*
(
pOneTableBlock
->
rowSize
+
expandSize
)
+
sizeof
(
STColumn
)
*
tscGetNumOfColumns
(
pOneTableBlock
->
pTableMeta
);
pBlocks
->
tid
=
htonl
(
pBlocks
->
tid
);
...
...
@@ -2032,7 +2062,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
pBlocks
->
schemaLen
=
0
;
// erase the empty space reserved for binary data
int32_t
finalLen
=
trimDataBlock
(
dataBuf
->
pData
+
dataBuf
->
size
,
pOneTableBlock
,
pInsertParam
->
schemaAttached
,
blkKeyInfo
.
pKeyTuple
);
int32_t
finalLen
=
trimDataBlock
(
dataBuf
->
pData
+
dataBuf
->
size
,
pOneTableBlock
,
pInsertParam
,
blkKeyInfo
.
pKeyTuple
);
assert
(
finalLen
<=
len
);
dataBuf
->
size
+=
(
finalLen
+
sizeof
(
SSubmitBlk
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录