Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
13a6a0a3
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
13a6a0a3
编写于
9月 22, 2022
作者:
Z
zhihaop
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: merge the SSubmitBlks and sort the SMemRows before sending to vnodes to increase performance
上级
6b7ea1d2
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
376 addition
and
38 deletion
+376
-38
src/client/inc/tscBulkWrite.h
src/client/inc/tscBulkWrite.h
+1
-0
src/client/src/tscBulkWrite.c
src/client/src/tscBulkWrite.c
+5
-0
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+370
-38
未找到文件。
src/client/inc/tscBulkWrite.h
浏览文件 @
13a6a0a3
...
@@ -101,6 +101,7 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher);
...
@@ -101,6 +101,7 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher);
* 1. auto batch feature on the sql object must be enabled.
* 1. auto batch feature on the sql object must be enabled.
* 2. must be an `insert into ... value ...` statement.
* 2. must be an `insert into ... value ...` statement.
* 3. the payload type must be kv payload.
* 3. the payload type must be kv payload.
* 4. no schema attached.
*
*
* @param dispatcher the async dispatcher.
* @param dispatcher the async dispatcher.
* @param pSql the sql object to check.
* @param pSql the sql object to check.
...
...
src/client/src/tscBulkWrite.c
浏览文件 @
13a6a0a3
...
@@ -431,6 +431,11 @@ bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSq
...
@@ -431,6 +431,11 @@ bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSq
return
false
;
return
false
;
}
}
// no schema attached.
if
(
pInsertParam
->
schemaAttached
)
{
return
false
;
}
// too many insertion rows, fail back to normal insertion.
// too many insertion rows, fail back to normal insertion.
if
(
statementGetInsertionRows
(
pSql
)
>=
dispatcher
->
batchSize
)
{
if
(
statementGetInsertionRows
(
pSql
)
>=
dispatcher
->
batchSize
)
{
return
false
;
return
false
;
...
...
src/client/src/tscUtil.c
浏览文件 @
13a6a0a3
...
@@ -2198,25 +2198,314 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
...
@@ -2198,25 +2198,314 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return
result
;
return
result
;
}
}
static
void
extractTableNameList
(
SSqlObj
*
pSql
,
SInsertStatementParam
*
pInsertParam
)
{
static
void
extractTableNameList
(
SSqlObj
*
pSql
,
SInsertStatementParam
*
pInsertParam
,
SArray
*
pTableDataBlockList
)
{
pInsertParam
->
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pInsertParam
->
pTableBlockHashList
);
pInsertParam
->
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pInsertParam
->
pTableBlockHashList
);
if
(
pInsertParam
->
pTableNameList
==
NULL
)
{
if
(
pInsertParam
->
pTableNameList
==
NULL
)
{
pInsertParam
->
pTableNameList
=
malloc
(
pInsertParam
->
numOfTables
*
POINTER_BYTES
);
pInsertParam
->
pTableNameList
=
malloc
(
pInsertParam
->
numOfTables
*
POINTER_BYTES
);
}
}
STableDataBlocks
**
p1
=
taosHashIterate
(
pInsertParam
->
pTableBlockHashList
,
NULL
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTableDataBlockList
);
++
i
)
{
int32_t
i
=
0
;
STableDataBlocks
*
pBlocks
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pTableDataBlockList
,
i
));
while
(
p1
)
{
STableDataBlocks
*
pBlocks
=
*
p1
;
//tfree(pInsertParam->pTableNameList[i]);
//tfree(pInsertParam->pTableNameList[i]);
pInsertParam
->
pTableNameList
[
i
++
]
=
tNameDup
(
&
pBlocks
->
tableName
);
pInsertParam
->
pTableNameList
[
i
]
=
tNameDup
(
&
pBlocks
->
tableName
);
p1
=
taosHashIterate
(
pInsertParam
->
pTableBlockHashList
,
p1
);
}
}
/**
* Resize the the data blocks data.
*
* @param dataBlocks the data blocks.
* @param destSize the destination size.
* @return whether is success.
*/
inline
static
bool
resizeDataBlocksData
(
STableDataBlocks
*
dataBlocks
,
size_t
destSize
)
{
if
(
dataBlocks
->
nAllocSize
>=
destSize
)
{
return
true
;
}
size_t
nAllocSize
=
destSize
+
(
destSize
>>
1
);
char
*
pData
=
realloc
(
dataBlocks
->
pData
,
dataBlocks
->
nAllocSize
);
if
(
!
pData
)
{
return
false
;
}
dataBlocks
->
pData
=
pData
;
dataBlocks
->
nAllocSize
=
nAllocSize
;
return
false
;
}
/**
* A builder of SSubmitBlk.
*/
typedef
struct
SSubmitBlkBuilder
{
// the metadata of the SSubmitBlk.
SSubmitBlk
*
metadata
;
// the array stores all the rows in a table, aka SArray<SMemRow>.
SArray
*
rows
;
}
SSubmitBlkBuilder
;
/**
* Create a SSubmitBlkBuilder using exist metadata.
*
* @param metadata the metadata.
* @return the SSubmitBlkBuilder.
*/
SSubmitBlkBuilder
*
createSSubmitBlkBuilder
(
SSubmitBlk
*
metadata
)
{
SSubmitBlkBuilder
*
builder
=
calloc
(
1
,
sizeof
(
SSubmitBlkBuilder
));
if
(
!
builder
)
{
return
NULL
;
}
builder
->
rows
=
taosArrayInit
(
1
,
sizeof
(
SMemRow
));
if
(
!
builder
->
rows
)
{
free
(
builder
);
return
NULL
;
}
builder
->
metadata
=
calloc
(
1
,
sizeof
(
SSubmitBlk
));
if
(
!
builder
->
metadata
)
{
taosArrayDestroy
(
&
builder
->
rows
);
free
(
builder
);
return
NULL
;
}
memcpy
(
builder
->
metadata
,
metadata
,
sizeof
(
SSubmitBlk
));
return
builder
;
}
/**
* Destroy the SSubmitBlkBuilder.
*
* @param builder
*/
void
destroySSubmitBlkBuilder
(
SSubmitBlkBuilder
*
builder
)
{
if
(
!
builder
)
{
return
;
}
taosArrayDestroy
(
&
builder
->
rows
);
free
(
builder
->
metadata
);
free
(
builder
);
}
/**
* Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's.
*
* @param builder the SSubmitBlkBuilder.
* @param pBlock the pBlock to append.
* @return whether the append is success.
*/
static
bool
appendSSubmitBlkBuilder
(
SSubmitBlkBuilder
*
builder
,
SSubmitBlk
*
pBlock
)
{
assert
(
pBlock
->
uid
==
builder
->
metadata
->
uid
);
assert
(
pBlock
->
schemaLen
==
0
);
char
*
pRow
=
pBlock
->
data
;
char
*
pEnd
=
pBlock
->
data
+
htonl
(
pBlock
->
dataLen
);
while
(
pRow
<
pEnd
)
{
if
(
!
taosArrayPush
(
builder
->
rows
,
&
pRow
))
{
return
false
;
}
pRow
+=
memRowTLen
(
pRow
);
}
return
true
;
}
/**
* A util function to sort SArray<SMemRow> by key.
*/
static
int32_t
compareSMemRow
(
const
void
*
x
,
const
void
*
y
)
{
TSKEY
left
=
memRowKey
(
*
(
void
**
)
x
);
TSKEY
right
=
memRowKey
(
*
(
void
**
)
y
);
if
(
left
==
right
)
{
return
0
;
}
else
{
return
left
>
right
?
1
:
-
1
;
}
}
/**
* Build and write SSubmitBlk to `target`
*
* @param builder the SSubmitBlkBuilder.
* @param target the target to write.
* @return the writen bytes.
*/
static
size_t
writeSSubmitBlkBuilder
(
SSubmitBlkBuilder
*
builder
,
SSubmitBlk
*
target
)
{
memcpy
(
target
,
builder
->
metadata
,
sizeof
(
SSubmitBlk
));
uint32_t
dataLen
=
0
;
taosArraySort
(
builder
->
rows
,
compareSMemRow
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
builder
->
rows
);
++
i
)
{
char
*
pRow
=
*
(
char
**
)
(
taosArrayGet
(
builder
->
rows
,
i
));
memcpy
(
POINTER_SHIFT
(
target
->
data
,
dataLen
),
pRow
,
memRowTLen
(
pRow
));
dataLen
+=
memRowTLen
(
pRow
);
}
target
->
schemaLen
=
0
;
target
->
dataLen
=
htonl
(
dataLen
);
target
->
numOfRows
=
htons
(
taosArrayGetSize
(
builder
->
rows
));
return
dataLen
+
sizeof
(
SSubmitBlk
);
}
/**
* Get the expected writen bytes of `writeSSubmitBlkBuilder`.
*
* @param builder the SSubmitBlkBuilder.
* @return the expected writen bytes of `writeSSubmitBlkBuilder`.
*/
static
size_t
writenSizeSSubmitBlkBuilder
(
SSubmitBlkBuilder
*
builder
)
{
size_t
dataLen
=
0
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
builder
->
rows
);
++
i
)
{
char
*
pRow
=
*
(
char
**
)
(
taosArrayGet
(
builder
->
rows
,
i
));
dataLen
+=
memRowTLen
(
pRow
);
}
return
dataLen
+
sizeof
(
SSubmitBlk
);
}
/**
* The builder to build SSubmitMsg::blocks.
*/
typedef
struct
SSubmitMsgBlocksBuilder
{
// SHashObj<table_uid, SSubmitBlkBuilder*>.
SHashObj
*
blockBuilders
;
int64_t
vgId
;
}
SSubmitMsgBlocksBuilder
;
/**
* Create a SSubmitMsgBuilder.
*
* @param vgId the vgId of SSubmitMsg.
* @return the SSubmitMsgBuilder.
*/
static
SSubmitMsgBlocksBuilder
*
createSSubmitMsgBuilder
(
int64_t
vgId
)
{
SSubmitMsgBlocksBuilder
*
builder
=
calloc
(
1
,
sizeof
(
SSubmitMsgBlocksBuilder
));
if
(
!
builder
)
{
return
NULL
;
}
builder
->
vgId
=
vgId
;
builder
->
blockBuilders
=
taosHashInit
(
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
if
(
!
builder
->
blockBuilders
)
{
free
(
builder
);
return
NULL
;
}
return
builder
;
}
/**
* Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`.
*/
static
size_t
writenSizeSSubmitMsgBuilder
(
SSubmitMsgBlocksBuilder
*
builder
)
{
size_t
allocSize
=
0
;
SSubmitBlkBuilder
**
iter
=
taosHashIterate
(
builder
->
blockBuilders
,
NULL
);
while
(
iter
)
{
SSubmitBlkBuilder
*
blocksBuilder
=
*
iter
;
allocSize
+=
writenSizeSSubmitBlkBuilder
(
blocksBuilder
);
iter
=
taosHashIterate
(
builder
->
blockBuilders
,
iter
);
}
return
allocSize
;
}
/**
* Build and write SSubmitMsg::blocks to `pBlocks`
*
* @param builder the SSubmitBlkBuilder.
* @param pBlocks the target to write.
* @return the writen bytes.
*/
size_t
writeSSubmitMsgBlocksBuilder
(
SSubmitMsgBlocksBuilder
*
builder
,
SSubmitBlk
*
pBlocks
)
{
size_t
nWrite
=
0
;
SSubmitBlkBuilder
**
iter
=
taosHashIterate
(
builder
->
blockBuilders
,
NULL
);
while
(
iter
)
{
SSubmitBlkBuilder
*
blocksBuilder
=
*
iter
;
SSubmitBlk
*
pBlock
=
POINTER_SHIFT
(
pBlocks
,
nWrite
);
nWrite
+=
writeSSubmitBlkBuilder
(
blocksBuilder
,
pBlock
);
iter
=
taosHashIterate
(
builder
->
blockBuilders
,
iter
);
}
return
nWrite
;
}
/**
* Destroy the SSubmitMsgBlocksBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder to destroy.
*/
static
void
destroySSubmitMsgBuilder
(
SSubmitMsgBlocksBuilder
*
builder
)
{
if
(
!
builder
)
{
return
;
}
SSubmitBlkBuilder
**
iter
=
taosHashIterate
(
builder
->
blockBuilders
,
NULL
);
while
(
iter
)
{
destroySSubmitBlkBuilder
(
*
iter
);
iter
=
taosHashIterate
(
builder
->
blockBuilders
,
iter
);
}
taosHashCleanup
(
builder
->
blockBuilders
);
}
/**
* If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @param pBlock the SSubmitBlk.
* @return the SSubmitBlkBuilder (NULL means failure).
*/
static
SSubmitBlkBuilder
*
computeIfAbsentSSubmitBlkBuilder
(
SSubmitMsgBlocksBuilder
*
builder
,
SSubmitBlk
*
pBlock
)
{
SSubmitBlkBuilder
**
iter
=
taosHashGet
(
builder
->
blockBuilders
,
&
pBlock
->
uid
,
sizeof
(
pBlock
->
uid
));
SSubmitBlkBuilder
*
blocksBuilder
=
NULL
;
if
(
iter
)
{
return
*
iter
;
}
blocksBuilder
=
createSSubmitBlkBuilder
(
pBlock
);
if
(
!
blocksBuilder
)
{
return
NULL
;
}
if
(
taosHashPut
(
builder
->
blockBuilders
,
&
pBlock
->
uid
,
sizeof
(
pBlock
->
uid
),
&
blocksBuilder
,
sizeof
(
SArray
*
)))
{
destroySSubmitBlkBuilder
(
blocksBuilder
);
return
NULL
;
}
return
blocksBuilder
;
}
/**
* Append SSubmitMsg* to the SSubmitMsgBlocksBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @param pMsg the SSubmitMsg*
* @param numOfBlocks the number of blocks in SSubmitMsg.
* @return whether the append is success.
*/
static
bool
appendSSubmitMsgBlocks
(
SSubmitMsgBlocksBuilder
*
builder
,
SSubmitBlk
*
pBlocks
,
size_t
numOfBlocks
)
{
SSubmitBlk
*
pBlock
=
pBlocks
;
for
(
size_t
i
=
0
;
i
<
numOfBlocks
;
++
i
)
{
assert
(
pBlock
->
schemaLen
==
0
);
SSubmitBlkBuilder
*
blocksBuilder
=
computeIfAbsentSSubmitBlkBuilder
(
builder
,
pBlock
);
if
(
!
blocksBuilder
)
{
return
false
;
}
if
(
!
appendSSubmitBlkBuilder
(
blocksBuilder
,
pBlock
))
{
return
false
;
}
size_t
blockSize
=
sizeof
(
SSubmitBlk
)
+
htonl
(
pBlock
->
dataLen
);
pBlock
=
POINTER_SHIFT
(
pBlock
,
blockSize
);
}
}
return
true
;
}
}
/**
/**
* Merge the KV-PayLoad SQL objects into single one. (the statements here must be an insertion statement).
* Merge the KV-PayLoad SQL objects into single one.
* The statements here must be an insertion statement and no schema attached.
*
*
* @param statements the array of statements. a.k.a SArray<SSqlObj*>.
* @param statements the array of statements. a.k.a SArray<SSqlObj*>.
* @param result the returned result. result is not null!
* @param result the returned result. result is not null!
...
@@ -2242,6 +2531,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
...
@@ -2242,6 +2531,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// initialize the `pVnodeDataBlockHashList`.
// initialize the `pVnodeDataBlockHashList`.
assert
(
pMergeInsertParam
->
payloadType
==
PAYLOAD_TYPE_KV
);
assert
(
pMergeInsertParam
->
payloadType
==
PAYLOAD_TYPE_KV
);
assert
(
!
pMergeInsertParam
->
schemaAttached
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pMergeInsertParam
->
pDataBlocks
);
++
i
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pMergeInsertParam
->
pDataBlocks
);
++
i
)
{
STableDataBlocks
*
pDataBlocks
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pMergeInsertParam
->
pDataBlocks
,
i
));
STableDataBlocks
*
pDataBlocks
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pMergeInsertParam
->
pDataBlocks
,
i
));
if
(
taosHashPut
(
pVnodeDataBlockHashList
,
&
pDataBlocks
->
vgId
,
sizeof
(
pDataBlocks
->
vgId
),
&
pDataBlocks
,
sizeof
(
STableDataBlocks
*
)))
{
if
(
taosHashPut
(
pVnodeDataBlockHashList
,
&
pDataBlocks
->
vgId
,
sizeof
(
pDataBlocks
->
vgId
),
&
pDataBlocks
,
sizeof
(
STableDataBlocks
*
)))
{
...
@@ -2257,17 +2547,12 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
...
@@ -2257,17 +2547,12 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
SInsertStatementParam
*
pInsertParam
=
&
pCmd
->
insertParam
;
SInsertStatementParam
*
pInsertParam
=
&
pCmd
->
insertParam
;
assert
(
pInsertParam
->
payloadType
==
PAYLOAD_TYPE_KV
);
assert
(
pInsertParam
->
payloadType
==
PAYLOAD_TYPE_KV
);
assert
(
!
pInsertParam
->
schemaAttached
);
// merge all the data blocks by vgroup id.
// merge all the data blocks by vgroup id.
for
(
int
j
=
0
;
pInsertParam
->
pDataBlocks
&&
j
<
taosArrayGetSize
(
pInsertParam
->
pDataBlocks
);
++
j
)
{
for
(
int
j
=
0
;
pInsertParam
->
pDataBlocks
&&
j
<
taosArrayGetSize
(
pInsertParam
->
pDataBlocks
);
++
j
)
{
STableDataBlocks
*
tableBlock
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pInsertParam
->
pDataBlocks
,
j
));
STableDataBlocks
*
tableBlock
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pInsertParam
->
pDataBlocks
,
j
));
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
tableBlock
->
pData
;
// SSubmitMsg *pBlocks = (SSubmitMsg *)tableBlock->pData;
// skip the empty data block.
if
(
pBlocks
->
numOfRows
<=
0
)
{
tscDebug
(
"0x%"
PRIx64
" table %s data block is empty"
,
pInsertParam
->
objectId
,
tableBlock
->
tableName
.
tname
);
continue
;
}
// get the data blocks of vgroup id.
// get the data blocks of vgroup id.
STableDataBlocks
*
dataBuf
=
NULL
;
STableDataBlocks
*
dataBuf
=
NULL
;
...
@@ -2287,14 +2572,13 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
...
@@ -2287,14 +2572,13 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
dataBuf
=
*
iter
;
dataBuf
=
*
iter
;
}
}
// the allocated size is too small.
// header: SMsgDesc + SSubmitMsg(without SSubmitBlk[])
int64_t
destSize
=
dataBuf
->
size
+
(
tableBlock
->
size
-
tableBlock
->
headerSize
);
assert
(
dataBuf
->
headerSize
==
(
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
)));
if
(
dataBuf
->
nAllocSize
<
destSize
)
{
assert
(
dataBuf
->
headerSize
==
tableBlock
->
headerSize
);
dataBuf
->
nAllocSize
=
(
uint32_t
)(
destSize
*
1
.
5
);
const
size_t
headerSize
=
tableBlock
->
headerSize
;
char
*
tmp
=
realloc
(
dataBuf
->
pData
,
dataBuf
->
nAllocSize
);
const
size_t
destSize
=
dataBuf
->
size
+
(
tableBlock
->
size
-
tableBlock
->
headerSize
);
if
(
tmp
!=
NULL
)
{
dataBuf
->
pData
=
tmp
;
if
(
!
resizeDataBlocksData
(
dataBuf
,
debugFlag
))
{
}
else
{
// failed to allocate memory, free already allocated memory and return error code
tscError
(
"0x%"
PRIx64
" failed to allocate memory for merging submit block, size:%d"
,
pInsertParam
->
objectId
,
tscError
(
"0x%"
PRIx64
" failed to allocate memory for merging submit block, size:%d"
,
pInsertParam
->
objectId
,
dataBuf
->
nAllocSize
);
dataBuf
->
nAllocSize
);
...
@@ -2302,11 +2586,16 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
...
@@ -2302,11 +2586,16 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
tfree
(
dataBuf
->
pData
);
tfree
(
dataBuf
->
pData
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
}
// copy the data into vgroup data blocks.
// pData = SMsgDesc + SSubmitMsg(with SSubmitBlk[])
memcpy
(
dataBuf
->
pData
+
dataBuf
->
size
,
tableBlock
->
pData
+
tableBlock
->
headerSize
,
tableBlock
->
size
-
tableBlock
->
headerSize
);
SSubmitBlk
*
target
=
(
SSubmitBlk
*
)
(
dataBuf
->
pData
+
headerSize
);
dataBuf
->
size
+=
tableBlock
->
size
-
tableBlock
->
headerSize
;
SSubmitBlk
*
source
=
(
SSubmitBlk
*
)
(
tableBlock
->
pData
+
headerSize
);
const
size_t
targetSize
=
dataBuf
->
size
-
headerSize
;
const
size_t
sourceSize
=
tableBlock
->
size
-
headerSize
;
memcpy
(
POINTER_SHIFT
(
target
,
targetSize
),
source
,
sourceSize
);
dataBuf
->
size
=
destSize
;
dataBuf
->
numOfTables
+=
tableBlock
->
numOfTables
;
dataBuf
->
numOfTables
+=
tableBlock
->
numOfTables
;
tscDestroyDataBlock
(
pSql
,
tableBlock
,
false
);
tscDestroyDataBlock
(
pSql
,
tableBlock
,
false
);
}
}
...
@@ -2319,6 +2608,36 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
...
@@ -2319,6 +2608,36 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// clean up.
// clean up.
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
*
result
=
merged
;
*
result
=
merged
;
// rebuild SubmitMsg::blocks.
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pMergeDataBlocks
);
++
i
)
{
STableDataBlocks
*
pDataBlocks
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pMergeDataBlocks
,
i
));
SSubmitMsgBlocksBuilder
*
builder
=
createSSubmitMsgBuilder
(
pDataBlocks
->
vgId
);
if
(
!
builder
)
{
break
;
}
SSubmitBlk
*
pBlock
=
(
SSubmitBlk
*
)
(
pDataBlocks
->
pData
+
pDataBlocks
->
headerSize
);
if
(
!
appendSSubmitMsgBlocks
(
builder
,
pBlock
,
pDataBlocks
->
numOfTables
))
{
destroySSubmitMsgBuilder
(
builder
);
break
;
}
size_t
nAllocSize
=
pDataBlocks
->
headerSize
+
writenSizeSSubmitMsgBuilder
(
builder
);
char
*
pData
=
calloc
(
1
,
nAllocSize
);
if
(
!
pData
)
{
destroySSubmitMsgBuilder
(
builder
);
break
;
}
pDataBlocks
->
nAllocSize
=
nAllocSize
;
pDataBlocks
->
size
=
pDataBlocks
->
headerSize
+
writeSSubmitMsgBlocksBuilder
(
builder
,
(
SSubmitBlk
*
)(
pData
+
pDataBlocks
->
headerSize
));
memcpy
(
pData
,
pDataBlocks
->
pData
,
pDataBlocks
->
headerSize
);
free
(
pDataBlocks
->
pData
);
pDataBlocks
->
pData
=
pData
;
destroySSubmitMsgBuilder
(
builder
);
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2326,7 +2645,11 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
...
@@ -2326,7 +2645,11 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
const
int
INSERT_HEAD_SIZE
=
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
);
const
int
INSERT_HEAD_SIZE
=
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
);
int
code
=
0
;
int
code
=
0
;
bool
isRawPayload
=
IS_RAW_PAYLOAD
(
pInsertParam
->
payloadType
);
bool
isRawPayload
=
IS_RAW_PAYLOAD
(
pInsertParam
->
payloadType
);
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
128
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
size_t
initialSize
=
taosHashGetSize
(
pInsertParam
->
pTableBlockHashList
);
initialSize
=
initialSize
>
128
?
128
:
initialSize
;
void
*
pVnodeDataBlockHashList
=
taosHashInit
(
initialSize
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
false
);
SArray
*
pTableDataBlockList
=
taosArrayInit
(
taosHashGetSize
(
pInsertParam
->
pTableBlockHashList
),
POINTER_BYTES
);
SArray
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
SArray
*
pVnodeDataBlockList
=
taosArrayInit
(
8
,
POINTER_BYTES
);
STableDataBlocks
**
p
=
taosHashIterate
(
pInsertParam
->
pTableBlockHashList
,
NULL
);
STableDataBlocks
**
p
=
taosHashIterate
(
pInsertParam
->
pTableBlockHashList
,
NULL
);
...
@@ -2337,6 +2660,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
...
@@ -2337,6 +2660,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
while
(
pOneTableBlock
)
{
while
(
pOneTableBlock
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
pOneTableBlock
->
pData
;
taosArrayPush
(
pTableDataBlockList
,
&
pOneTableBlock
);
if
(
pBlocks
->
numOfRows
>
0
)
{
if
(
pBlocks
->
numOfRows
>
0
)
{
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t
expandSize
=
isRawPayload
?
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
)
:
0
;
int32_t
expandSize
=
isRawPayload
?
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
)
:
0
;
...
@@ -2347,6 +2671,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
...
@@ -2347,6 +2671,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"0x%"
PRIx64
" failed to prepare the data block buffer for merging table data, code:%d"
,
pInsertParam
->
objectId
,
ret
);
tscError
(
"0x%"
PRIx64
" failed to prepare the data block buffer for merging table data, code:%d"
,
pInsertParam
->
objectId
,
ret
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosArrayDestroy
(
&
pTableDataBlockList
);
tscDestroyBlockArrayList
(
pSql
,
pVnodeDataBlockList
);
tscDestroyBlockArrayList
(
pSql
,
pVnodeDataBlockList
);
tfree
(
blkKeyInfo
.
pKeyTuple
);
tfree
(
blkKeyInfo
.
pKeyTuple
);
return
ret
;
return
ret
;
...
@@ -2431,15 +2756,22 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
...
@@ -2431,15 +2756,22 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
pOneTableBlock
=
*
p
;
pOneTableBlock
=
*
p
;
}
}
extractTableNameList
(
pSql
,
pInsertParam
);
extractTableNameList
(
pSql
,
pInsertParam
,
pTableDataBlockList
);
if
(
freeBlockMap
)
{
if
(
freeBlockMap
&&
pInsertParam
->
pTableBlockHashList
)
{
pInsertParam
->
pTableBlockHashList
=
tscDestroyBlockHashTable
(
pSql
,
pInsertParam
->
pTableBlockHashList
,
false
);
taosHashCleanup
(
pInsertParam
->
pTableBlockHashList
);
pInsertParam
->
pTableBlockHashList
=
NULL
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTableDataBlockList
);
++
i
)
{
STableDataBlocks
*
pDataBlocks
=
*
((
STableDataBlocks
**
)
taosArrayGet
(
pTableDataBlockList
,
i
));
tscDestroyDataBlock
(
pSql
,
pDataBlocks
,
false
);
}
}
}
// free the table data blocks;
// free the table data blocks;
pInsertParam
->
pDataBlocks
=
pVnodeDataBlockList
;
pInsertParam
->
pDataBlocks
=
pVnodeDataBlockList
;
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosHashCleanup
(
pVnodeDataBlockHashList
);
taosArrayDestroy
(
&
pTableDataBlockList
);
tfree
(
blkKeyInfo
.
pKeyTuple
);
tfree
(
blkKeyInfo
.
pKeyTuple
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录