Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
641531bc
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
641531bc
编写于
7月 06, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: fix compile issue
上级
2304e12c
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
15 addition
and
15 deletion
+15
-15
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+15
-15
未找到文件。
source/libs/executor/src/dataInserter.c
浏览文件 @
641531bc
...
...
@@ -45,7 +45,7 @@ typedef struct SDataInserterHandle {
SDataDeleterNode
*
pDeleter
;
SDeleterParam
*
pParam
;
STaosQueue
*
pDataBlocks
;
SData
DeleterBuf
nextOutput
;
SData
InserterBuf
nextOutput
;
int32_t
status
;
bool
queryEnd
;
uint64_t
useconds
;
...
...
@@ -69,7 +69,7 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
return
false
;
}
static
void
toDataCacheEntry
(
SData
DeleterHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDele
terBuf
*
pBuf
)
{
static
void
toDataCacheEntry
(
SData
InserterHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataInser
terBuf
*
pBuf
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pHandle
->
pSchema
->
pSlots
);
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pBuf
->
pData
;
...
...
@@ -98,7 +98,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp
atomic_add_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pEntry
->
dataLen
);
}
static
bool
allocBuf
(
SData
DeleterHandle
*
pDeleter
,
const
SInputData
*
pInput
,
SDataDele
terBuf
*
pBuf
)
{
static
bool
allocBuf
(
SData
InserterHandle
*
pDeleter
,
const
SInputData
*
pInput
,
SDataInser
terBuf
*
pBuf
)
{
uint32_t
capacity
=
pDeleter
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
;
if
(
taosQueueItemSize
(
pDeleter
->
pDataBlocks
)
>
capacity
)
{
qError
(
"SinkNode queue is full, no capacity, max:%d, current:%d, no capacity"
,
capacity
,
...
...
@@ -116,7 +116,7 @@ static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDa
return
NULL
!=
pBuf
->
pData
;
}
static
int32_t
updateStatus
(
SData
Dele
terHandle
*
pDeleter
)
{
static
int32_t
updateStatus
(
SData
Inser
terHandle
*
pDeleter
)
{
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
int32_t
blockNums
=
taosQueueItemSize
(
pDeleter
->
pDataBlocks
);
int32_t
status
=
...
...
@@ -127,7 +127,7 @@ static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
return
status
;
}
static
int32_t
getStatus
(
SData
Dele
terHandle
*
pDeleter
)
{
static
int32_t
getStatus
(
SData
Inser
terHandle
*
pDeleter
)
{
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
int32_t
status
=
pDeleter
->
status
;
taosThreadMutexUnlock
(
&
pDeleter
->
mutex
);
...
...
@@ -135,8 +135,8 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) {
}
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SData
DeleterHandle
*
pDeleter
=
(
SDataDele
terHandle
*
)
pHandle
;
SData
DeleterBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDele
terBuf
),
DEF_QITEM
);
SData
InserterHandle
*
pDeleter
=
(
SDataInser
terHandle
*
)
pHandle
;
SData
InserterBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataInser
terBuf
),
DEF_QITEM
);
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDeleter
,
pInput
,
pBuf
))
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
...
...
@@ -147,7 +147,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
}
static
void
endPut
(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
useconds
)
{
SData
DeleterHandle
*
pDeleter
=
(
SDataDele
terHandle
*
)
pHandle
;
SData
InserterHandle
*
pDeleter
=
(
SDataInser
terHandle
*
)
pHandle
;
taosThreadMutexLock
(
&
pDeleter
->
mutex
);
pDeleter
->
queryEnd
=
true
;
pDeleter
->
useconds
=
useconds
;
...
...
@@ -155,16 +155,16 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
}
static
void
getDataLength
(
SDataSinkHandle
*
pHandle
,
int32_t
*
pLen
,
bool
*
pQueryEnd
)
{
SData
DeleterHandle
*
pDeleter
=
(
SDataDele
terHandle
*
)
pHandle
;
SData
InserterHandle
*
pDeleter
=
(
SDataInser
terHandle
*
)
pHandle
;
if
(
taosQueueEmpty
(
pDeleter
->
pDataBlocks
))
{
*
pQueryEnd
=
pDeleter
->
queryEnd
;
*
pLen
=
0
;
return
;
}
SData
Dele
terBuf
*
pBuf
=
NULL
;
SData
Inser
terBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDeleter
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
memcpy
(
&
pDeleter
->
nextOutput
,
pBuf
,
sizeof
(
SData
Dele
terBuf
));
memcpy
(
&
pDeleter
->
nextOutput
,
pBuf
,
sizeof
(
SData
Inser
terBuf
));
taosFreeQitem
(
pBuf
);
*
pLen
=
((
SDataCacheEntry
*
)(
pDeleter
->
nextOutput
.
pData
))
->
dataLen
;
*
pQueryEnd
=
pDeleter
->
queryEnd
;
...
...
@@ -172,7 +172,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
SOutputData
*
pOutput
)
{
SData
DeleterHandle
*
pDeleter
=
(
SDataDele
terHandle
*
)
pHandle
;
SData
InserterHandle
*
pDeleter
=
(
SDataInser
terHandle
*
)
pHandle
;
if
(
NULL
==
pDeleter
->
nextOutput
.
pData
)
{
assert
(
pDeleter
->
queryEnd
);
pOutput
->
useconds
=
pDeleter
->
useconds
;
...
...
@@ -202,11 +202,11 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
}
static
int32_t
destroyDataSinker
(
SDataSinkHandle
*
pHandle
)
{
SData
DeleterHandle
*
pDeleter
=
(
SDataDele
terHandle
*
)
pHandle
;
SData
InserterHandle
*
pDeleter
=
(
SDataInser
terHandle
*
)
pHandle
;
atomic_sub_fetch_64
(
&
gDataSinkStat
.
cachedSize
,
pDeleter
->
cachedSize
);
taosMemoryFreeClear
(
pDeleter
->
nextOutput
.
pData
);
while
(
!
taosQueueEmpty
(
pDeleter
->
pDataBlocks
))
{
SData
Dele
terBuf
*
pBuf
=
NULL
;
SData
Inser
terBuf
*
pBuf
=
NULL
;
taosReadQitem
(
pDeleter
->
pDataBlocks
,
(
void
**
)
&
pBuf
);
taosMemoryFreeClear
(
pBuf
->
pData
);
taosFreeQitem
(
pBuf
);
...
...
@@ -217,7 +217,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
static
int32_t
getCacheSize
(
struct
SDataSinkHandle
*
pHandle
,
uint64_t
*
size
)
{
SData
DeleterHandle
*
pDispatcher
=
(
SDataDele
terHandle
*
)
pHandle
;
SData
InserterHandle
*
pDispatcher
=
(
SDataInser
terHandle
*
)
pHandle
;
*
size
=
atomic_load_64
(
&
pDispatcher
->
cachedSize
);
return
TSDB_CODE_SUCCESS
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录