Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
369b5559
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
369b5559
编写于
1月 11, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
1月 11, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9744 from taosdata/feature/qnode
Feature/qnode
上级
442f0db1
6677dfba
变更
41
展开全部
隐藏空白更改
内联
并排
Showing
41 changed file
with
2824 addition
and
2321 deletion
+2824
-2321
include/common/common.h
include/common/common.h
+6
-0
include/common/tmsg.h
include/common/tmsg.h
+8
-0
include/dnode/vnode/tsdb/tsdb.h
include/dnode/vnode/tsdb/tsdb.h
+134
-0
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+2
-3
include/libs/executor/executor.h
include/libs/executor/executor.h
+20
-14
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+7
-5
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+4
-2
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+56
-13
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+448
-438
source/dnode/vnode/impl/src/vnodeQuery.c
source/dnode/vnode/impl/src/vnodeQuery.c
+1
-1
source/dnode/vnode/tsdb/CMakeLists.txt
source/dnode/vnode/tsdb/CMakeLists.txt
+1
-0
source/dnode/vnode/tsdb/src/tsdbCommit.c
source/dnode/vnode/tsdb/src/tsdbCommit.c
+1
-1
source/dnode/vnode/tsdb/src/tsdbRead.c
source/dnode/vnode/tsdb/src/tsdbRead.c
+1066
-1086
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+1
-0
source/libs/executor/CMakeLists.txt
source/libs/executor/CMakeLists.txt
+23
-8
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+2
-2
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+1
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+29
-28
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+6
-1
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+1
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-4
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+76
-190
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+143
-179
source/libs/executor/test/CMakeLists.txt
source/libs/executor/test/CMakeLists.txt
+18
-0
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+221
-0
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+3
-0
source/libs/planner/inc/plannerInt.h
source/libs/planner/inc/plannerInt.h
+1
-1
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+8
-7
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+10
-3
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+12
-13
source/libs/planner/test/phyPlanTests.cpp
source/libs/planner/test/phyPlanTests.cpp
+3
-0
source/libs/qworker/CMakeLists.txt
source/libs/qworker/CMakeLists.txt
+21
-5
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+8
-6
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+195
-119
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+0
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+173
-113
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+67
-34
source/os/src/osSysinfo.c
source/os/src/osSysinfo.c
+1
-1
src/inc/tsdb.h
src/inc/tsdb.h
+8
-8
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+2
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+33
-33
未找到文件。
include/common/common.h
浏览文件 @
369b5559
...
...
@@ -62,6 +62,12 @@ typedef struct SConstantItem {
SVariant
value
;
}
SConstantItem
;
typedef
struct
{
uint32_t
numOfTables
;
SArray
*
pGroupList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
}
STableGroupInfo
;
typedef
struct
SSDataBlock
{
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
...
...
include/common/tmsg.h
浏览文件 @
369b5559
...
...
@@ -986,6 +986,14 @@ typedef struct {
char
msg
[];
}
SSubQueryMsg
;
typedef
struct
{
SMsgHead
header
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
SSinkDataReq
;
typedef
struct
{
SMsgHead
header
;
uint64_t
sId
;
...
...
include/dnode/vnode/tsdb/tsdb.h
浏览文件 @
369b5559
...
...
@@ -18,6 +18,7 @@
#include "mallocator.h"
#include "meta.h"
#include "common.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -39,6 +40,10 @@ typedef struct STable {
STSchema
*
pSchema
;
}
STable
;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
...
...
@@ -58,6 +63,22 @@ typedef struct STsdbCfg {
int8_t
compression
;
}
STsdbCfg
;
// query condition to build multi-table data block iterator
typedef
struct
STsdbQueryCond
{
STimeWindow
twindow
;
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
bool
loadExternalRows
;
// load external rows or not
int32_t
type
;
// data block load type:
}
STsdbQueryCond
;
typedef
struct
{
void
*
pTable
;
TSKEY
lastKey
;
uint64_t
uid
;
}
STableKeyInfo
;
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
);
void
tsdbClose
(
STsdb
*
);
...
...
@@ -70,6 +91,119 @@ int tsdbCommit(STsdb *pTsdb);
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
typedef
void
*
tsdbReadHandleT
;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReadHandleT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
void
*
pRef
);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReadHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReadHandleT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
bool
isTsdbCacheLastRow
(
tsdbReadHandleT
*
pTsdbReadHandle
);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReadHandleT
*
pHandle
);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool
tsdbNextDataBlock
(
tsdbReadHandleT
pTsdbReadHandle
);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void
tsdbRetrieveDataBlockInfo
(
tsdbReadHandleT
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdbReadHandleT
*
pTsdbReadHandle
,
SDataStatis
**
pBlockStatis
);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray
*
tsdbRetrieveDataBlock
(
tsdbReadHandleT
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t
tsdbGetOneTableGroup
(
STsdb
*
tsdb
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
/**
* clean up the query handle
* @param queryHandle
*/
void
tsdbCleanupQueryHandle
(
tsdbReadHandleT
queryHandle
);
#ifdef __cplusplus
}
#endif
...
...
include/libs/executor/dataSinkMgt.h
浏览文件 @
369b5559
...
...
@@ -22,6 +22,7 @@ extern "C" {
#include "os.h"
#include "thash.h"
#include "executor.h"
#define DS_BUF_LOW 1
#define DS_BUF_FULL 2
...
...
@@ -31,14 +32,12 @@ struct SDataSink;
struct
SSDataBlock
;
typedef
struct
SDataSinkMgtCfg
{
uint32_t
maxDataBlockNum
;
uint32_t
maxDataBlockNum
;
// todo: this should be numOfRows?
uint32_t
maxDataBlockNumPerQuery
;
}
SDataSinkMgtCfg
;
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
);
typedef
void
*
DataSinkHandle
;
typedef
struct
SInputData
{
const
struct
SSDataBlock
*
pData
;
SHashObj
*
pTableRetrieveTsMap
;
...
...
include/libs/executor/executor.h
浏览文件 @
369b5559
...
...
@@ -21,24 +21,30 @@ extern "C" {
#endif
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
struct
SSubplan
;
/**
* create the qinfo object according to QueryTableMsg
* @param tsdb
* @param pQueryTableMsg
* @param pTaskInfo
* @return
*/
int32_t
qCreateTask
(
void
*
tsdb
,
int32_t
vgId
,
void
*
pQueryTableMsg
,
qTaskInfo_t
*
pTaskInfo
,
uint64_t
qId
);
/**
* Create the exec task object according to task json
* @param tsdb
* @param vgId
* @param pTaskInfoMsg
* @param pTaskInfo
* @param qId
* @return
*/
int32_t
qCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
);
/**
*
the main query
execution function, including query on both table and multiple tables,
*
The main task
execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @param tinfo
* @param handle
* @return
*/
bool
qExecTask
(
qTaskInfo_t
qinfo
,
uint64_t
*
qId
);
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
DataSinkHandle
*
handle
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
@@ -60,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
* @param contLen payload length
* @return
*/
int32_t
qDumpRetrieveResult
(
qTaskInfo_t
qinfo
,
SRetrieveTableRsp
**
pRsp
,
int32_t
*
contLen
,
bool
*
continueExec
);
//
int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* return the transporter context (RPC)
...
...
@@ -81,7 +87,7 @@ int32_t qKillTask(qTaskInfo_t qinfo);
* @param qinfo
* @return
*/
int32_t
qIs
Query
Completed
(
qTaskInfo_t
qinfo
);
int32_t
qIs
Task
Completed
(
qTaskInfo_t
qinfo
);
/**
* destroy query info structure
...
...
@@ -113,7 +119,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
* @param numOfIndex
* @return
*/
int32_t
qCreateTableGroupByGroupExpr
(
SArray
*
pTableIdList
,
TSKEY
skey
,
STableGroupInfo
groupInfo
,
SColIndex
*
groupByIndex
,
int32_t
numOfIndex
);
//
int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex* groupByIndex, int32_t numOfIndex);
/**
* Update the table id list of a given query.
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
369b5559
...
...
@@ -20,8 +20,10 @@
extern
"C"
{
#endif
#include "planner.h"
#include "catalog.h"
#include "planner.h"
struct
SSchJob
;
typedef
struct
SSchedulerCfg
{
uint32_t
maxJobNum
;
...
...
@@ -65,7 +67,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
,
SQueryResult
*
pRes
);
int32_t
scheduleExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
struct
SSchJob
**
pJob
,
SQueryResult
*
pRes
);
/**
* Process the query job, generated according to the query physical plan.
...
...
@@ -73,7 +75,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
void
**
pJob
);
int32_t
scheduleAsyncExecJob
(
void
*
transport
,
SArray
*
nodeList
,
SQueryDag
*
pDag
,
struct
SSchJob
**
pJob
);
/**
* Fetch query result from the remote query executor
...
...
@@ -81,7 +83,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
* @param data
* @return
*/
int32_t
scheduleFetchRows
(
void
*
pJob
,
void
**
data
);
int32_t
scheduleFetchRows
(
struct
SSchJob
*
pJob
,
void
**
data
);
/**
...
...
@@ -89,7 +91,7 @@ int32_t scheduleFetchRows(void *pJob, void **data);
* @param pJob
* @return
*/
int32_t
scheduleCancelJob
(
void
*
pJob
);
//
int32_t scheduleCancelJob(void *pJob);
/**
* Free the query job
...
...
source/client/inc/clientInt.h
浏览文件 @
369b5559
...
...
@@ -113,6 +113,7 @@ typedef struct SRequestSendRecvBody {
tsem_t
rspSem
;
// not used now
void
*
fp
;
SShowReqInfo
showInfo
;
// todo this attribute will be removed after the query framework being completed.
struct
SSchJob
*
pQueryJob
;
// query job, created according to sql query DAG.
SDataBuf
requestMsg
;
SReqResultInfo
resInfo
;
}
SRequestSendRecvBody
;
...
...
@@ -129,7 +130,7 @@ typedef struct SRequestObj {
char
*
msgBuf
;
void
*
pInfo
;
// sql parse info, generated by parser module
int32_t
code
;
uint64_t
affectedRows
;
uint64_t
affectedRows
;
// todo remove it
SQueryExecMetric
metric
;
SRequestSendRecvBody
body
;
}
SRequestObj
;
...
...
@@ -161,12 +162,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
initMsgHandleFp
();
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
);
void
*
doFetchRow
(
SRequestObj
*
pRequest
);
void
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
);
#ifdef __cplusplus
...
...
source/client/src/clientImpl.c
浏览文件 @
369b5559
#include "../../libs/scheduler/inc/schedulerInt.h"
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdef.h"
#include "tep.h"
#include "tglobal.h"
...
...
@@ -8,9 +12,6 @@
#include "tnote.h"
#include "tpagedfile.h"
#include "tref.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#define CHECK_CODE_GOTO(expr, lable) \
do { \
...
...
@@ -57,6 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
}
static
STscObj
*
taosConnectImpl
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
,
__taos_async_fn_t
fp
,
void
*
param
,
SAppInstInfo
*
pAppInfo
);
static
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SDataBlockSchema
*
pDataBlockSchema
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
)
{
if
(
taos_init
()
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -197,19 +199,49 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQueryNode
*
pQueryNode
,
SQueryDag
**
pDag
)
{
pRequest
->
type
=
pQueryNode
->
type
;
return
qCreateQueryDag
(
pQueryNode
,
pDag
,
pRequest
->
requestId
);
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
int32_t
code
=
qCreateQueryDag
(
pQueryNode
,
pDag
,
pRequest
->
requestId
);
if
(
code
!=
0
)
{
return
code
;
}
if
(
pQueryNode
->
type
==
TSDB_SQL_SELECT
)
{
SArray
*
pa
=
taosArrayGetP
((
*
pDag
)
->
pSubplans
,
0
);
SSubplan
*
pPlan
=
taosArrayGetP
(
pa
,
0
);
SDataBlockSchema
*
pDataBlockSchema
=
&
(
pPlan
->
pDataSink
->
schema
);
setResSchemaInfo
(
pResInfo
,
pDataBlockSchema
);
pRequest
->
type
=
TDMT_VND_QUERY
;
}
return
code
;
}
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SDataBlockSchema
*
pDataBlockSchema
)
{
assert
(
pDataBlockSchema
!=
NULL
&&
pDataBlockSchema
->
numOfCols
>
0
);
pResInfo
->
numOfCols
=
pDataBlockSchema
->
numOfCols
;
pResInfo
->
fields
=
calloc
(
pDataBlockSchema
->
numOfCols
,
sizeof
(
pDataBlockSchema
->
pSchema
[
0
]));
for
(
int32_t
i
=
0
;
i
<
pResInfo
->
numOfCols
;
++
i
)
{
SSchema
*
pSchema
=
&
pDataBlockSchema
->
pSchema
[
i
];
pResInfo
->
fields
[
i
].
bytes
=
pSchema
->
bytes
;
pResInfo
->
fields
[
i
].
type
=
pSchema
->
type
;
tstrncpy
(
pResInfo
->
fields
[
i
].
name
,
pSchema
[
i
].
name
,
tListLen
(
pResInfo
->
fields
[
i
].
name
));
}
}
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
,
void
**
pJob
)
{
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryDag
*
pDag
)
{
if
(
TSDB_SQL_INSERT
==
pRequest
->
type
||
TSDB_SQL_CREATE_TABLE
==
pRequest
->
type
)
{
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
scheduleExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
,
pDag
,
p
Job
,
&
res
);
int32_t
code
=
scheduleExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
,
pDag
,
&
pRequest
->
body
.
pQuery
Job
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// handle error and retry
}
else
{
if
(
*
p
Job
!=
NULL
)
{
scheduleFreeJob
(
*
p
Job
);
if
(
pRequest
->
body
.
pQuery
Job
!=
NULL
)
{
scheduleFreeJob
(
pRequest
->
body
.
pQuery
Job
);
}
}
...
...
@@ -217,7 +249,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
return
res
.
code
;
}
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
/*todo appInfo.xxx*/
,
pDag
,
p
Job
);
return
scheduleAsyncExecJob
(
pRequest
->
pTscObj
->
pTransporter
,
NULL
,
pDag
,
&
pRequest
->
body
.
pQuery
Job
);
}
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
)
{
...
...
@@ -294,7 +326,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
SRequestObj
*
pRequest
=
NULL
;
SQueryNode
*
pQuery
=
NULL
;
SQueryDag
*
pDag
=
NULL
;
void
*
pJob
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
CHECK_CODE_GOTO
(
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
),
_return
);
...
...
@@ -304,9 +335,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
CHECK_CODE_GOTO
(
execDdlQuery
(
pRequest
,
pQuery
),
_return
);
}
else
{
CHECK_CODE_GOTO
(
getPlan
(
pRequest
,
pQuery
,
&
pDag
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pDag
,
&
pJob
),
_return
);
CHECK_CODE_GOTO
(
scheduleQuery
(
pRequest
,
pDag
),
_return
);
pRequest
->
code
=
terrno
;
return
pRequest
;
}
_return:
...
...
@@ -315,6 +345,7 @@ _return:
if
(
NULL
!=
pRequest
&&
TSDB_CODE_SUCCESS
!=
terrno
)
{
pRequest
->
code
=
terrno
;
}
return
pRequest
;
}
...
...
@@ -513,7 +544,17 @@ void* doFetchRow(SRequestObj* pRequest) {
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
if
(
pResultInfo
->
pData
==
NULL
||
pResultInfo
->
current
>=
pResultInfo
->
numOfRows
)
{
if
(
pRequest
->
type
==
TDMT_MND_SHOW
)
{
if
(
pRequest
->
type
==
TDMT_VND_QUERY
)
{
pRequest
->
type
=
TDMT_VND_FETCH
;
scheduleFetchRows
(
pRequest
->
body
.
pQueryJob
,
(
void
**
)
&
pRequest
->
body
.
resInfo
.
pData
);
pResultInfo
->
current
=
0
;
if
(
pResultInfo
->
numOfRows
<=
pResultInfo
->
current
)
{
return
NULL
;
}
goto
_return
;
}
else
if
(
pRequest
->
type
==
TDMT_MND_SHOW
)
{
pRequest
->
type
=
TDMT_MND_SHOW_RETRIEVE
;
}
else
if
(
pRequest
->
type
==
TDMT_VND_SHOW_TABLES
)
{
pRequest
->
type
=
TDMT_VND_SHOW_TABLES_FETCH
;
...
...
@@ -556,6 +597,8 @@ void* doFetchRow(SRequestObj* pRequest) {
}
}
_return:
for
(
int32_t
i
=
0
;
i
<
pResultInfo
->
numOfCols
;
++
i
)
{
pResultInfo
->
row
[
i
]
=
pResultInfo
->
pCol
[
i
]
+
pResultInfo
->
fields
[
i
].
bytes
*
pResultInfo
->
current
;
if
(
IS_VAR_DATA_TYPE
(
pResultInfo
->
fields
[
i
].
type
))
{
...
...
source/client/test/clientTests.cpp
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/dnode/vnode/impl/src/vnodeQuery.c
浏览文件 @
369b5559
...
...
@@ -23,7 +23,7 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery);
int
vnodeProcessQueryReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vTrace
(
"query message is processed"
);
return
qWorkerProcessQueryMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
}
int
vnodeProcessFetchReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
...
...
source/dnode/vnode/tsdb/CMakeLists.txt
浏览文件 @
369b5559
...
...
@@ -13,6 +13,7 @@ else(0)
"src/tsdbReadImpl.c"
"src/tsdbFile.c"
"src/tsdbFS.c"
"src/tsdbRead.c"
)
endif
(
0
)
...
...
source/dnode/vnode/tsdb/src/tsdbCommit.c
浏览文件 @
369b5559
...
...
@@ -1253,7 +1253,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
pBlock
->
keyFirst
=
dataColsKeyFirst
(
pDataCols
);
pBlock
->
keyLast
=
dataColsKeyLast
(
pDataCols
);
tsdbDebug
(
"vgId:%d
tid:%d
a block of data is written to file %s, offset %"
PRId64
tsdbDebug
(
"vgId:%d
uid:%"
PRId64
"
a block of data is written to file %s, offset %"
PRId64
" numOfRows %d len %d numOfCols %"
PRId16
" keyFirst %"
PRId64
" keyLast %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_TID
(
pTable
),
TSDB_FILE_FULL_NAME
(
pDFile
),
offset
,
rowsToWrite
,
pBlock
->
len
,
pBlock
->
numOfCols
,
pBlock
->
keyFirst
,
pBlock
->
keyLast
);
...
...
source/dnode/vnode/tsdb/src/tsdbRead.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
369b5559
无法预览此类型文件
source/libs/executor/CMakeLists.txt
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/inc/executil.h
浏览文件 @
369b5559
...
...
@@ -38,7 +38,7 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES)
#define GET_
QID(_r) (((SQInfo*)((_r)->qinfo))->q
Id)
#define GET_
TASKID(_t) (((SExecTaskInfo*)(_t))->id.query
Id)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
...
...
@@ -157,6 +157,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t
mergeIntoGroupResult
(
SGroupResInfo
*
pGroupResInfo
,
struct
STaskRuntimeEnv
*
pRuntimeEnv
,
int32_t
*
offset
);
int32_t
initUdfInfo
(
struct
SUdfInfo
*
pUdfInfo
);
//
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorInt.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/inc/executorimpl.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/src/dataDispatcher.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/src/dataSinkMgt.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/src/executil.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/src/executorMain.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/src/executorimpl.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/test/CMakeLists.txt
0 → 100644
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/executor/test/executorTests.cpp
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/parser/src/parser.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/planner/inc/plannerInt.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlan.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/planner/src/planner.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/planner/test/phyPlanTests.cpp
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/qworker/CMakeLists.txt
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/qworker/src/qworker.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/scheduler/src/scheduler.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
source/os/src/osSysinfo.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
src/inc/tsdb.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
src/query/inc/qExecutor.h
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
src/query/src/qExecutor.c
浏览文件 @
369b5559
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录