Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5dd57de
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a5dd57de
编写于
1月 07, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
322f3cd9
e5732c6d
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
353 addition
and
5 deletion
+353
-5
include/libs/planner/planner.h
include/libs/planner/planner.h
+5
-2
source/libs/executor/CMakeLists.txt
source/libs/executor/CMakeLists.txt
+1
-1
source/libs/executor/inc/dataSinkInt.h
source/libs/executor/inc/dataSinkInt.h
+45
-0
source/libs/executor/inc/dataSinkMgt.h
source/libs/executor/inc/dataSinkMgt.h
+104
-0
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+140
-0
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+56
-0
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+2
-2
未找到文件。
include/libs/planner/planner.h
浏览文件 @
a5dd57de
...
...
@@ -50,8 +50,10 @@ struct SQueryStmtInfo;
typedef
SSchema
SSlotSchema
;
typedef
struct
SDataBlockSchema
{
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
SSlotSchema
*
pSchema
;
int32_t
numOfCols
;
// number of columns
int32_t
resultRowSize
;
int16_t
precision
;
}
SDataBlockSchema
;
typedef
struct
SQueryNodeBasicInfo
{
...
...
@@ -61,6 +63,7 @@ typedef struct SQueryNodeBasicInfo {
typedef
struct
SDataSink
{
SQueryNodeBasicInfo
info
;
SDataBlockSchema
schema
;
}
SDataSink
;
typedef
struct
SDataDispatcher
{
...
...
source/libs/executor/CMakeLists.txt
浏览文件 @
a5dd57de
...
...
@@ -8,5 +8,5 @@ target_include_directories(
target_link_libraries
(
executor
PRIVATE os util common function parser
PRIVATE os util common function parser
planner qcom
)
\ No newline at end of file
source/libs/executor/inc/dataSinkInt.h
0 → 100644
浏览文件 @
a5dd57de
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _DATA_SINK_INT_H
#define _DATA_SINK_INT_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "common.h"
#include "dataSinkMgt.h"
struct
SDataSink
;
struct
SDataSinkHandle
;
typedef
int32_t
(
*
FPutDataBlock
)(
struct
SDataSinkHandle
*
pHandle
,
const
SDataResult
*
pRes
);
typedef
int32_t
(
*
FGetDataBlock
)(
struct
SDataSinkHandle
*
pHandle
,
char
*
pData
,
int32_t
*
pLen
);
typedef
int32_t
(
*
FDestroyDataSinker
)(
struct
SDataSinkHandle
*
pHandle
);
typedef
struct
SDataSinkHandle
{
FPutDataBlock
fPut
;
FGetDataBlock
fGet
;
FDestroyDataSinker
fDestroy
;
}
SDataSinkHandle
;
int32_t
createDataDispatcher
(
const
struct
SDataSink
*
pDataSink
,
DataSinkHandle
*
pHandle
);
#ifdef __cplusplus
}
#endif
#endif
/*_DATA_SINK_INT_H*/
source/libs/executor/inc/dataSinkMgt.h
0 → 100644
浏览文件 @
a5dd57de
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _DATA_SINK_MGT_H
#define _DATA_SINK_MGT_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
#include "executorimpl.h"
#define DS_CAPACITY_ENOUGH 1
#define DS_CAPACITY_FULL 2
#define DS_NEED_SCHEDULE 3
struct
SDataSink
;
struct
SSDataBlock
;
typedef
struct
SDataSinkMgtCfg
{
uint32_t
maxDataBlockNum
;
uint32_t
maxDataBlockNumPerQuery
;
}
SDataSinkMgtCfg
;
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
);
typedef
void
*
DataSinkHandle
;
typedef
struct
SDataResult
{
SQueryCostInfo
profile
;
const
SSDataBlock
*
pData
;
SHashObj
*
pTableRetrieveTsMap
;
}
SDataResult
;
/**
* Create a subplan's datasinker handle for all later operations.
* @param pDataSink
* @param pHandle output
* @return error code
*/
int32_t
dsCreateDataSinker
(
const
struct
SDataSink
*
pDataSink
,
DataSinkHandle
*
pHandle
);
/**
* Put the result set returned by the executor into datasinker.
* @param handle
* @param pRes
* @return error code
*/
int32_t
dsPutDataBlock
(
DataSinkHandle
handle
,
const
SDataResult
*
pRes
);
/**
* Get the length of the data returned by the next call to dsGetDataBlock.
* @param handle
* @return data length
*/
int32_t
dsGetDataLength
(
DataSinkHandle
handle
);
/**
* Get data, the caller needs to allocate data memory.
* @param handle
* @param pData output
* @param pLen output
* @return error code
*/
int32_t
dsGetDataBlock
(
DataSinkHandle
handle
,
char
*
pData
,
int32_t
*
pLen
);
/**
* Get the datasinker state, after each dsPutDataBlock and dsGetDataBlock call.
* @param handle
* @return datasinker status
*/
int32_t
dsGetStatus
(
DataSinkHandle
handle
);
/**
* After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue.
* @param ahandle
* @param pItem
*/
void
dsScheduleProcess
(
void
*
ahandle
,
void
*
pItem
);
/**
* Destroy the datasinker handle.
* @param handle
*/
void
dsDestroyDataSinker
(
DataSinkHandle
handle
);
#ifdef __cplusplus
}
#endif
#endif
/*_DATA_SINK_MGT_H*/
source/libs/executor/src/dataDispatcher.c
0 → 100644
浏览文件 @
a5dd57de
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "planner.h"
#include "tcompression.h"
#include "tglobal.h"
#include "tqueue.h"
#define GET_BUF_DATA(buf) (buf)->pData + (buf)->pos
#define GET_BUF_REMAIN(buf) (buf)->remain
typedef
struct
SBuf
{
int32_t
size
;
int32_t
pos
;
int32_t
remain
;
char
*
pData
;
}
SBuf
;
typedef
struct
SDataDispatchHandle
{
SDataSinkHandle
sink
;
SDataBlockSchema
schema
;
STaosQueue
*
pDataBlocks
;
SBuf
buf
;
}
SDataDispatchHandle
;
static
bool
needCompress
(
const
SSDataBlock
*
pData
,
const
SDataBlockSchema
*
pSchema
)
{
if
(
tsCompressColData
<
0
||
0
==
pData
->
info
.
rows
)
{
return
false
;
}
for
(
int32_t
col
=
0
;
col
<
pSchema
->
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pData
->
pDataBlock
,
col
);
int32_t
colSize
=
pColRes
->
info
.
bytes
*
pData
->
info
.
rows
;
if
(
NEEDTO_COMPRESS_QUERY
(
colSize
))
{
return
true
;
}
}
return
false
;
}
static
int32_t
compressQueryColData
(
SColumnInfoData
*
pColRes
,
int32_t
numOfRows
,
char
*
data
,
int8_t
compressed
)
{
int32_t
colSize
=
pColRes
->
info
.
bytes
*
numOfRows
;
return
(
*
(
tDataTypes
[
pColRes
->
info
.
type
].
compFunc
))(
pColRes
->
pData
,
colSize
,
numOfRows
,
data
,
colSize
+
COMP_OVERFLOW_BYTES
,
compressed
,
NULL
,
0
);
}
static
void
doCopyQueryResultToMsg
(
const
SDataResult
*
pRes
,
const
SDataBlockSchema
*
pSchema
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
)
{
int32_t
*
compSizes
=
(
int32_t
*
)
data
;
if
(
compressed
)
{
data
+=
pSchema
->
numOfCols
*
sizeof
(
int32_t
);
}
for
(
int32_t
col
=
0
;
col
<
pSchema
->
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pRes
->
pData
->
pDataBlock
,
col
);
if
(
compressed
)
{
compSizes
[
col
]
=
compressQueryColData
(
pColRes
,
pRes
->
pData
->
info
.
rows
,
data
,
compressed
);
data
+=
compSizes
[
col
];
*
compLen
+=
compSizes
[
col
];
compSizes
[
col
]
=
htonl
(
compSizes
[
col
]);
}
else
{
memmove
(
data
,
pColRes
->
pData
,
pColRes
->
info
.
bytes
*
pRes
->
pData
->
info
.
rows
);
data
+=
pColRes
->
info
.
bytes
*
pRes
->
pData
->
info
.
rows
;
}
}
int32_t
numOfTables
=
(
int32_t
)
taosHashGetSize
(
pRes
->
pTableRetrieveTsMap
);
*
(
int32_t
*
)
data
=
htonl
(
numOfTables
);
data
+=
sizeof
(
int32_t
);
STableIdInfo
*
item
=
taosHashIterate
(
pRes
->
pTableRetrieveTsMap
,
NULL
);
while
(
item
)
{
STableIdInfo
*
pDst
=
(
STableIdInfo
*
)
data
;
pDst
->
uid
=
htobe64
(
item
->
uid
);
pDst
->
key
=
htobe64
(
item
->
key
);
data
+=
sizeof
(
STableIdInfo
);
item
=
taosHashIterate
(
pRes
->
pTableRetrieveTsMap
,
item
);
}
}
static
void
toRetrieveResult
(
SDataDispatchHandle
*
pHandle
,
const
SDataResult
*
pRes
,
char
*
pData
,
int32_t
*
pContLen
)
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
pData
;
pRsp
->
useconds
=
htobe64
(
pRes
->
profile
.
elapsedTime
);
pRsp
->
precision
=
htons
(
pHandle
->
schema
.
precision
);
pRsp
->
compressed
=
(
int8_t
)
needCompress
(
pRes
->
pData
,
&
(
pHandle
->
schema
));
pRsp
->
numOfRows
=
htonl
(
pRes
->
pData
->
info
.
rows
);
*
pContLen
=
sizeof
(
int32_t
)
+
sizeof
(
STableIdInfo
)
*
taosHashGetSize
(
pRes
->
pTableRetrieveTsMap
)
+
sizeof
(
SRetrieveTableRsp
);
doCopyQueryResultToMsg
(
pRes
,
&
pHandle
->
schema
,
pRsp
->
data
,
pRsp
->
compressed
,
&
pRsp
->
compLen
);
*
pContLen
+=
(
pRsp
->
compressed
?
pRsp
->
compLen
:
pHandle
->
schema
.
resultRowSize
*
pRes
->
pData
->
info
.
rows
);
pRsp
->
compLen
=
htonl
(
pRsp
->
compLen
);
// todo completed
}
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SDataResult
*
pRes
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
int32_t
useSize
=
0
;
toRetrieveResult
(
pDispatcher
,
pRes
,
GET_BUF_DATA
(
&
pDispatcher
->
buf
),
&
useSize
);
}
static
int32_t
getDataBlock
(
SDataSinkHandle
*
pHandle
,
char
*
pData
,
int32_t
*
pLen
)
{
}
static
int32_t
destroyDataSinker
(
SDataSinkHandle
*
pHandle
)
{
}
int32_t
createDataDispatcher
(
const
SDataSink
*
pDataSink
,
DataSinkHandle
*
pHandle
)
{
SDataDispatchHandle
*
dispatcher
=
calloc
(
1
,
sizeof
(
SDataDispatchHandle
));
if
(
NULL
==
dispatcher
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
dispatcher
->
sink
.
fPut
=
putDataBlock
;
dispatcher
->
sink
.
fGet
=
getDataBlock
;
dispatcher
->
sink
.
fDestroy
=
destroyDataSinker
;
dispatcher
->
pDataBlocks
=
taosOpenQueue
();
if
(
NULL
==
dispatcher
->
pDataBlocks
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
}
*
pHandle
=
dispatcher
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/dataSinkMgt.c
0 → 100644
浏览文件 @
a5dd57de
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkMgt.h"
#include "dataSinkInt.h"
#include "planner.h"
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
)
{
// todo
}
int32_t
dsCreateDataSinker
(
const
struct
SDataSink
*
pDataSink
,
DataSinkHandle
*
pHandle
)
{
if
(
DSINK_Dispatch
==
pDataSink
->
info
.
type
)
{
return
createDataDispatcher
(
pDataSink
,
pHandle
);
}
return
TSDB_CODE_FAILED
;
}
int32_t
dsPutDataBlock
(
DataSinkHandle
handle
,
const
SDataResult
*
pRes
)
{
SDataSinkHandle
*
pHandleImpl
=
(
SDataSinkHandle
*
)
handle
;
return
pHandleImpl
->
fPut
(
pHandleImpl
,
pRes
);
}
int32_t
dsGetDataLength
(
DataSinkHandle
handle
)
{
// todo
}
int32_t
dsGetDataBlock
(
DataSinkHandle
handle
,
char
*
pData
,
int32_t
*
pLen
)
{
SDataSinkHandle
*
pHandleImpl
=
(
SDataSinkHandle
*
)
handle
;
return
pHandleImpl
->
fGet
(
pHandleImpl
,
pData
,
pLen
);
}
int32_t
dsGetStatus
(
DataSinkHandle
handle
)
{
// todo
}
void
dsScheduleProcess
(
void
*
ahandle
,
void
*
pItem
)
{
// todo
}
void
dsDestroyDataSinker
(
DataSinkHandle
handle
)
{
SDataSinkHandle
*
pHandleImpl
=
(
SDataSinkHandle
*
)
handle
;
pHandleImpl
->
fDestroy
(
pHandleImpl
);
}
source/libs/parser/src/insertParser.c
浏览文件 @
a5dd57de
...
...
@@ -624,7 +624,7 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
if
(
NULL
==
context
.
pVgroupsHashObj
||
NULL
==
context
.
pTableBlockHashObj
||
NULL
==
context
.
pOutput
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
TSDB_CODE_
FAILED
;
return
TSDB_CODE_
TSC_OUT_OF_MEMORY
;
}
*
pInfo
=
context
.
pOutput
;
...
...
@@ -637,5 +637,5 @@ int32_t parseInsertSql(SParseContext* pContext, SVnodeModifOpStmtInfo** pInfo) {
}
destroyInsertParseContext
(
&
context
);
terrno
=
code
;
return
(
TSDB_CODE_SUCCESS
==
code
?
TSDB_CODE_SUCCESS
:
TSDB_CODE_FAILED
)
;
return
code
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录