Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5902a2e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a5902a2e
编写于
1月 17, 2022
作者:
D
dapan1121
提交者:
GitHub
1月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #9860 from taosdata/feature/qnode2
Feature/qnode2
上级
a40ebba7
e10c66d8
变更
14
展开全部
隐藏空白更改
内联
并排
Showing
14 changed file
with
1557 addition
and
1132 deletion
+1557
-1132
include/libs/executor/dataSinkMgt.h
include/libs/executor/dataSinkMgt.h
+1
-1
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+1
-1
include/libs/scheduler/scheduler.h
include/libs/scheduler/scheduler.h
+16
-0
include/util/taoserror.h
include/util/taoserror.h
+5
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+15
-8
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+3
-0
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-1
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+71
-27
source/libs/qworker/inc/qworkerMsg.h
source/libs/qworker/inc/qworkerMsg.h
+48
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+743
-1092
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+553
-0
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+91
-0
source/util/src/terror.c
source/util/src/terror.c
+8
-0
未找到文件。
include/libs/executor/dataSinkMgt.h
浏览文件 @
a5902a2e
...
...
@@ -48,7 +48,7 @@ typedef struct SOutputData {
int8_t
compressed
;
char
*
pData
;
bool
queryEnd
;
bool
needSchedule
;
int32_t
scheduleJobNo
;
int32_t
bufStatus
;
int64_t
useconds
;
int8_t
precision
;
...
...
include/libs/qworker/qworker.h
浏览文件 @
a5902a2e
...
...
@@ -55,7 +55,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcess
QueryContinue
Msg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcess
CQuery
Msg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
...
...
include/libs/scheduler/scheduler.h
浏览文件 @
a5902a2e
...
...
@@ -59,6 +59,11 @@ typedef struct SQueryResult {
char
*
msg
;
}
SQueryResult
;
typedef
struct
STaskInfo
{
SQueryNodeAddr
addr
;
SSubQueryMsg
*
msg
;
}
STaskInfo
;
int32_t
schedulerInit
(
SSchedulerCfg
*
cfg
);
/**
...
...
@@ -101,6 +106,17 @@ void scheduleFreeJob(void *pJob);
void
schedulerDestroy
(
void
);
/**
* convert dag to task list
* @param pDag
* @param pTasks SArray**<STaskInfo>
* @return
*/
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
);
void
schedulerFreeTaskList
(
SArray
*
taskList
);
#ifdef __cplusplus
}
#endif
...
...
include/util/taoserror.h
浏览文件 @
a5902a2e
...
...
@@ -356,7 +356,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
a5902a2e
...
...
@@ -28,7 +28,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case
TDMT_VND_QUERY
:
return
qWorkerProcessQueryMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_QUERY_CONTINUE
:
return
qWorkerProcess
QueryContinue
Msg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcess
CQuery
Msg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
case
TDMT_VND_SCHEDULE_DATA_SINK
:
return
qWorkerProcessDataSinkMsg
(
pVnode
->
pTsdb
,
pVnode
->
pQuery
,
pMsg
);
default:
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
a5902a2e
...
...
@@ -47,6 +47,11 @@ enum {
CTG_RENT_STABLE
,
};
typedef
struct
SCTGDebug
{
int32_t
lockDebug
;
}
SCTGDebug
;
typedef
struct
SVgroupListCache
{
int32_t
vgroupVersion
;
SHashObj
*
cache
;
// key:vgId, value:SVgroupInfo
...
...
@@ -134,20 +139,22 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define CTG_LOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug
("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug
("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
...
...
@@ -155,15 +162,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
#define CTG_UNLOCK(type, _lock) do { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
qDebug
("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug
("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug
("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug
("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG
("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
a5902a2e
...
...
@@ -20,6 +20,9 @@
SCatalogMgmt
ctgMgmt
=
{
0
};
SCTGDebug
gCTGDebug
=
{
0
};
int32_t
ctgGetDBVgroupFromCache
(
struct
SCatalog
*
pCatalog
,
const
char
*
dbName
,
SDBVgroupInfo
**
dbInfo
,
bool
*
inCache
)
{
if
(
NULL
==
pCatalog
->
dbCache
.
cache
)
{
*
inCache
=
false
;
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
a5902a2e
...
...
@@ -196,7 +196,7 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
pOutput
->
bufStatus
=
updateStatus
(
pDispatcher
);
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
pOutput
->
queryEnd
=
pDispatcher
->
queryEnd
;
pOutput
->
needSchedule
=
false
;
pOutput
->
scheduleJobNo
=
0
;
pOutput
->
useconds
=
pDispatcher
->
useconds
;
pOutput
->
precision
=
pDispatcher
->
schema
.
precision
;
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
a5902a2e
...
...
@@ -27,14 +27,30 @@ extern "C" {
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum
{
QW_READY_NOT_RECEIVED
=
0
,
QW_READY_RECEIVED
,
QW_READY_RESPONSED
,
QW_PHASE_PRE_QUERY
=
1
,
QW_PHASE_POST_QUERY
,
QW_PHASE_PRE_CQUERY
,
QW_PHASE_POST_CQUERY
,
QW_PHASE_PRE_SINK
,
QW_PHASE_POST_SINK
,
QW_PHASE_PRE_FETCH
,
QW_PHASE_POST_FETCH
,
};
enum
{
QW_TASK_INFO_STATUS
=
1
,
QW_TASK_INFO_READY
,
QW_EVENT_CANCEL
=
1
,
QW_EVENT_READY
,
QW_EVENT_FETCH
,
QW_EVENT_DROP
,
QW_EVENT_CQUERY
,
QW_EVENT_MAX
,
};
enum
{
QW_EVENT_NOT_RECEIVED
=
0
,
QW_EVENT_RECEIVED
,
QW_EVENT_PROCESSED
,
};
enum
{
...
...
@@ -57,21 +73,45 @@ enum {
QW_ADD_ACQUIRE
,
};
typedef
struct
SQWDebug
{
int32_t
lockDebug
;
}
SQWDebug
;
typedef
struct
SQWMsg
{
void
*
node
;
char
*
msg
;
int32_t
msgLen
;
void
*
connection
;
}
SQWMsg
;
typedef
struct
SQWPhaseInput
{
int8_t
status
;
int32_t
code
;
qTaskInfo_t
taskHandle
;
DataSinkHandle
sinkHandle
;
}
SQWPhaseInput
;
typedef
struct
SQWPhaseOutput
{
int32_t
rspCode
;
bool
needStop
;
bool
needRsp
;
}
SQWPhaseOutput
;
typedef
struct
SQWTaskStatus
{
SRWLatch
lock
;
int32_t
code
;
int8_t
status
;
int8_t
ready
;
bool
cancel
;
bool
drop
;
}
SQWTaskStatus
;
typedef
struct
SQWTaskCtx
{
SRWLatch
lock
;
int8_t
sinkScheduled
;
int8_t
queryScheduled
;
int32_t
phase
;
int32_t
sinkId
;
int32_t
readyCode
;
int8_t
events
[
QW_EVENT_MAX
];
bool
needRsp
;
qTaskInfo_t
taskHandle
;
DataSinkHandle
sinkHandle
;
}
SQWTaskCtx
;
...
...
@@ -95,15 +135,22 @@ typedef struct SQWorkerMgmt {
putReqToQueryQFp
putToQueueFp
;
}
SQWorkerMgmt
;
#define QW_GOT_RES_DATA(data) (true)
#define QW_LOW_RES_DATA(data) (false)
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
#define QW_IDS() sId, qId, tId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_IN_EXECUTOR(ctx) ((ctx)->phase == QW_PHASE_PRE_QUERY || (ctx)->phase == QW_PHASE_PRE_CQUERY || (ctx)->phase == QW_PHASE_PRE_FETCH || (ctx)->phase == QW_PHASE_PRE_SINK)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
...
...
@@ -123,21 +170,22 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
qDebug
("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
qDebug
("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
qDebug
("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
...
...
@@ -145,25 +193,21 @@ typedef struct SQWorkerMgmt {
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
qDebug
("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
qDebug
("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug
("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
qDebug
("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG
("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
int32_t
qwAcquireScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
);
int32_t
qwAcquireAddScheduler
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
SQWSchStatus
**
sch
);
int32_t
qwAcquireTask
(
SQWorkerMgmt
*
mgmt
,
int32_t
rwType
,
SQWSchStatus
*
sch
,
uint64_t
qId
,
uint64_t
tId
,
SQWTaskStatus
**
task
);
#ifdef __cplusplus
}
...
...
source/libs/qworker/inc/qworkerMsg.h
0 → 100644
浏览文件 @
a5902a2e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_MSG_H_
#define _TD_QWORKER_MSG_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t
qwProcessQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessCQuery
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessReady
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessFetch
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessDrop
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
SQWMsg
*
qwMsg
);
int32_t
qwBuildAndSendDropRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendFetchRsp
(
void
*
connection
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
);
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
);
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
);
int32_t
qwBuildAndSendSchSinkMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
);
int32_t
qwBuildAndSendReadyRsp
(
void
*
connection
,
int32_t
code
);
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
);
void
qwFreeFetchRsp
(
void
*
msg
);
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_QWORKER_INT_H_*/
source/libs/qworker/src/qworker.c
浏览文件 @
a5902a2e
此差异已折叠。
点击以展开。
source/libs/qworker/src/qworkerMsg.c
0 → 100644
浏览文件 @
a5902a2e
#include "qworker.h"
#include <common.h>
#include "executor.h"
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "qworkerMsg.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
int32_t
qwMallocFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
)
{
int32_t
msgSize
=
sizeof
(
SRetrieveTableRsp
)
+
length
;
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
msgSize
);
if
(
NULL
==
pRsp
)
{
qError
(
"rpcMallocCont %d failed"
,
msgSize
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
*
rsp
=
pRsp
;
return
TSDB_CODE_SUCCESS
;
}
void
qwBuildFetchRsp
(
void
*
msg
,
SOutputData
*
input
,
int32_t
len
)
{
SRetrieveTableRsp
*
rsp
=
(
SRetrieveTableRsp
*
)
msg
;
rsp
->
useconds
=
htobe64
(
input
->
useconds
);
rsp
->
completed
=
input
->
queryEnd
;
rsp
->
precision
=
input
->
precision
;
rsp
->
compressed
=
input
->
compressed
;
rsp
->
compLen
=
htonl
(
len
);
rsp
->
numOfRows
=
htonl
(
input
->
numOfRows
);
}
void
qwFreeFetchRsp
(
void
*
msg
)
{
if
(
msg
)
{
rpcFreeCont
(
msg
);
}
}
int32_t
qwBuildAndSendQueryRsp
(
void
*
connection
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryTableRsp
*
pRsp
=
(
SQueryTableRsp
*
)
rpcMallocCont
(
sizeof
(
SQueryTableRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendReadyRsp
(
void
*
connection
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SResReadyRsp
*
pRsp
=
(
SResReadyRsp
*
)
rpcMallocCont
(
sizeof
(
SResReadyRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendStatusRsp
(
SRpcMsg
*
pMsg
,
SSchedulerStatusRsp
*
sStatus
)
{
int32_t
size
=
0
;
if
(
sStatus
)
{
size
=
sizeof
(
SSchedulerStatusRsp
)
+
sizeof
(
sStatus
->
status
[
0
])
*
sStatus
->
num
;
}
else
{
size
=
sizeof
(
SSchedulerStatusRsp
);
}
SSchedulerStatusRsp
*
pRsp
=
(
SSchedulerStatusRsp
*
)
rpcMallocCont
(
size
);
if
(
sStatus
)
{
memcpy
(
pRsp
,
sStatus
,
size
);
}
else
{
pRsp
->
num
=
0
;
}
SRpcMsg
rpcRsp
=
{
.
msgType
=
pMsg
->
msgType
+
1
,
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
size
,
.
code
=
0
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendFetchRsp
(
void
*
connection
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
if
(
NULL
==
pRsp
)
{
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
dataLength
=
0
;
}
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
)
+
dataLength
,
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCancelRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
STaskCancelRsp
*
pRsp
=
(
STaskCancelRsp
*
)
rpcMallocCont
(
sizeof
(
STaskCancelRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendDropRsp
(
void
*
connection
,
int32_t
code
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
STaskDropRsp
*
pRsp
=
(
STaskDropRsp
*
)
rpcMallocCont
(
sizeof
(
STaskDropRsp
));
pRsp
->
code
=
code
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendShowRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
int32_t
numOfCols
=
6
;
int32_t
msgSize
=
sizeof
(
SVShowTablesRsp
)
+
sizeof
(
SSchema
)
*
numOfCols
;
SVShowTablesRsp
*
pRsp
=
(
SVShowTablesRsp
*
)
rpcMallocCont
(
msgSize
);
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pRsp
->
metaInfo
.
pSchema
;
const
SSchema
*
s
=
tGetTbnameColumnSchema
();
*
pSchema
=
createSchema
(
s
->
type
,
htonl
(
s
->
bytes
),
htonl
(
++
cols
),
"name"
);
pSchema
++
;
int32_t
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"created"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_SMALLINT
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"columns"
);
pSchema
++
;
*
pSchema
=
createSchema
(
s
->
type
,
htonl
(
s
->
bytes
),
htonl
(
++
cols
),
"stable"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_BIGINT
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"uid"
);
pSchema
++
;
type
=
TSDB_DATA_TYPE_INT
;
*
pSchema
=
createSchema
(
type
,
htonl
(
tDataTypes
[
type
].
bytes
),
htonl
(
++
cols
),
"vgId"
);
assert
(
cols
==
numOfCols
);
pRsp
->
metaInfo
.
numOfColumns
=
htonl
(
cols
);
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
msgSize
,
.
code
=
code
,
};
rpcSendResponse
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendShowFetchRsp
(
SRpcMsg
*
pMsg
,
SVShowTablesFetchReq
*
pFetchReq
)
{
SVShowTablesFetchRsp
*
pRsp
=
(
SVShowTablesFetchRsp
*
)
rpcMallocCont
(
sizeof
(
SVShowTablesFetchRsp
));
int32_t
handle
=
htonl
(
pFetchReq
->
id
);
pRsp
->
numOfRows
=
0
;
SRpcMsg
rpcMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
0
,
};
rpcSendResponse
(
&
rpcMsg
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendSchSinkMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SSinkDataReq
*
req
=
(
SSinkDataReq
*
)
rpcMallocCont
(
sizeof
(
SSinkDataReq
));
if
(
NULL
==
req
)
{
qError
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SSinkDataReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
msgType
=
TDMT_VND_SCHEDULE_DATA_SINK
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SSinkDataReq
),
.
code
=
0
,
};
int32_t
code
=
(
*
mgmt
->
putToQueueFp
)(
mgmt
->
nodeObj
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qError
(
"put data sink schedule msg to queue failed, code:%x"
,
code
);
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
qDebug
(
"put data sink schedule msg to query queue"
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendCQueryMsg
(
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
void
*
connection
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
connection
;
SQueryContinueReq
*
req
=
(
SQueryContinueReq
*
)
rpcMallocCont
(
sizeof
(
SQueryContinueReq
));
if
(
NULL
==
req
)
{
QW_SCH_TASK_ELOG
(
"rpcMallocCont %d failed"
,
(
int32_t
)
sizeof
(
SQueryContinueReq
));
QW_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
req
->
header
.
vgId
=
mgmt
->
nodeId
;
req
->
sId
=
sId
;
req
->
queryId
=
qId
;
req
->
taskId
=
tId
;
SRpcMsg
pNewMsg
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
msgType
=
TDMT_VND_QUERY_CONTINUE
,
.
pCont
=
req
,
.
contLen
=
sizeof
(
SQueryContinueReq
),
.
code
=
0
,
};
int32_t
code
=
(
*
mgmt
->
putToQueueFp
)(
mgmt
->
nodeObj
,
&
pNewMsg
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
QW_SCH_TASK_ELOG
(
"put query continue msg to queue failed, code:%x"
,
code
);
rpcFreeCont
(
req
);
QW_ERR_RET
(
code
);
}
QW_SCH_TASK_DLOG
(
"put query continue msg to query queue, vgId:%d"
,
mgmt
->
nodeId
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
msg
->
contentLen
=
ntohl
(
msg
->
contentLen
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
,
.
msgLen
=
msg
->
contentLen
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p"
,
node
);
QW_RET
(
qwProcessQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processQuery end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessCQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
queryDone
=
false
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, contLen:%d"
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processCQuery start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessCQuery
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processCQuery end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDataSinkMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SSinkDataReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid sink data msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
//dsScheduleProcess();
//TODO
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SResReadyReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processReady start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessReady
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processReady end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessStatusMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SSchTasksStatusReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task status msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
htobe64
(
msg
->
sId
);
SSchedulerStatusRsp
*
sStatus
=
NULL
;
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
QW_ERR_RET
(
qwBuildAndSendStatusRsp
(
pMsg
,
sStatus
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processFetch start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessFetch
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processFetch end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessCancelMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid task cancel msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
htobe64
(
msg
->
sId
);
msg
->
queryId
=
htobe64
(
msg
->
queryId
);
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
QW_ERR_RET
(
qwBuildAndSendCancelRsp
(
pMsg
,
code
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessDropMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
msg
->
sId
=
be64toh
(
msg
->
sId
);
msg
->
queryId
=
be64toh
(
msg
->
queryId
);
msg
->
taskId
=
be64toh
(
msg
->
taskId
);
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
NULL
,
.
msgLen
=
0
,
.
connection
=
pMsg
};
QW_SCH_TASK_DLOG
(
"processDrop start, node:%p"
,
node
);
QW_ERR_RET
(
qwProcessDrop
(
QW_FPARAMS
(),
&
qwMsg
));
QW_SCH_TASK_DLOG
(
"processDrop end, node:%p"
,
node
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessShowMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
int32_t
code
=
0
;
SVShowTablesReq
*
pReq
=
pMsg
->
pCont
;
QW_ERR_RET
(
qwBuildAndSendShowRsp
(
pMsg
,
code
));
}
int32_t
qWorkerProcessShowFetchMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SVShowTablesFetchReq
*
pFetchReq
=
pMsg
->
pCont
;
QW_ERR_RET
(
qwBuildAndSendShowFetchRsp
(
pMsg
,
pFetchReq
));
}
source/libs/scheduler/src/scheduler.c
浏览文件 @
a5902a2e
...
...
@@ -1374,6 +1374,83 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
return
TSDB_CODE_SUCCESS
;
}
int32_t
schedulerConvertDagToTaskList
(
SQueryDag
*
pDag
,
SArray
**
pTasks
)
{
if
(
NULL
==
pDag
||
pDag
->
numOfSubplans
<=
0
||
taosArrayGetSize
(
pDag
->
pSubplans
)
<=
0
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
levelNum
=
taosArrayGetSize
(
pDag
->
pSubplans
);
if
(
1
!=
levelNum
)
{
qError
(
"invalid level num: %d"
,
levelNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SArray
*
plans
=
taosArrayGet
(
pDag
->
pSubplans
,
0
);
int32_t
taskNum
=
taosArrayGetSize
(
plans
);
if
(
taskNum
<=
0
)
{
qError
(
"invalid task num: %d"
,
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SArray
*
info
=
taosArrayInit
(
taskNum
,
sizeof
(
STaskInfo
));
if
(
NULL
==
info
)
{
qError
(
"taosArrayInit %d taskInfo failed"
,
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
STaskInfo
tInfo
=
{
0
};
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
SSubplan
*
plan
=
taosArrayGetP
(
plans
,
i
);
tInfo
.
addr
=
plan
->
execNode
;
code
=
qSubPlanToString
(
plan
,
&
msg
,
&
msgLen
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
NULL
==
msg
||
msgLen
<=
0
)
{
qError
(
"subplanToString error, code:%x, msg:%p, len:%d"
,
code
,
msg
,
msgLen
);
SCH_ERR_JRET
(
code
);
}
int32_t
msgSize
=
sizeof
(
SSubQueryMsg
)
+
msgLen
;
msg
=
calloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
qError
(
"calloc %d failed"
,
msgSize
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SSubQueryMsg
*
pMsg
=
msg
;
pMsg
->
header
.
vgId
=
htonl
(
tInfo
.
addr
.
nodeId
);
pMsg
->
sId
=
htobe64
(
schMgmt
.
sId
);
pMsg
->
queryId
=
htobe64
(
plan
->
id
.
queryId
);
pMsg
->
taskId
=
htobe64
(
atomic_add_fetch_64
(
&
schMgmt
.
taskId
,
1
));
pMsg
->
contentLen
=
htonl
(
msgLen
);
memcpy
(
pMsg
->
msg
,
msg
,
msgLen
);
tInfo
.
msg
=
pMsg
;
if
(
NULL
==
taosArrayPush
(
info
,
&
tInfo
))
{
qError
(
"taosArrayPush failed, idx:%d"
,
i
);
free
(
msg
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
*
pTasks
=
info
;
info
=
NULL
;
_return:
schedulerFreeTaskList
(
info
);
SCH_RET
(
code
);
}
int32_t
scheduleFetchRows
(
SSchJob
*
pJob
,
void
**
pData
)
{
if
(
NULL
==
pJob
||
NULL
==
pData
)
{
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
@@ -1521,6 +1598,20 @@ void scheduleFreeJob(void *job) {
qDebug
(
"QID:%"
PRIx64
" job freed"
,
queryId
);
}
void
schedulerFreeTaskList
(
SArray
*
taskList
)
{
if
(
NULL
==
taskList
)
{
return
;
}
int32_t
taskNum
=
taosArrayGetSize
(
taskList
);
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
STaskInfo
*
info
=
taosArrayGet
(
taskList
,
i
);
tfree
(
info
->
msg
);
}
taosArrayDestroy
(
taskList
);
}
void
schedulerDestroy
(
void
)
{
if
(
schMgmt
.
jobs
)
{
...
...
source/util/src/terror.c
浏览文件 @
a5902a2e
...
...
@@ -354,6 +354,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist")
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_NOT_EXIST
,
"Task not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_ALREADY_EXIST
,
"Task already exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_RES_CACHE_NOT_EXIST
,
"Task result cache not exist"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_CANCELLED
,
"Task cancelled"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPED
,
"Task dropped"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_CANCELLING
,
"Task cancelling"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_DROPPING
,
"Task dropping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_DUPLICATTED_OPERATION
,
"Duplicatted operation"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_QRY_TASK_MSG_ERROR
,
"Task message error"
)
// grant
TAOS_DEFINE_ERROR
(
TSDB_CODE_GRANT_EXPIRED
,
"License expired"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录