Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
24190a8f
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
24190a8f
编写于
2月 17, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
dnodeShell
上级
4563289f
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
295 addition
and
470 deletion
+295
-470
src/client/src/tscSql.c
src/client/src/tscSql.c
+1
-1
src/dnode/inc/dnodeMgmt.h
src/dnode/inc/dnodeMgmt.h
+5
-0
src/dnode/inc/dnodeRead.h
src/dnode/inc/dnodeRead.h
+12
-0
src/dnode/inc/dnodeShell.h
src/dnode/inc/dnodeShell.h
+2
-1
src/dnode/inc/dnodeSystem.h
src/dnode/inc/dnodeSystem.h
+2
-1
src/dnode/inc/dnodeUtil.h
src/dnode/inc/dnodeUtil.h
+39
-0
src/dnode/inc/dnodeWrite.h
src/dnode/inc/dnodeWrite.h
+6
-4
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+2
-2
src/dnode/src/dnodeRead.c
src/dnode/src/dnodeRead.c
+44
-0
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+103
-443
src/dnode/src/dnodeSystem.c
src/dnode/src/dnodeSystem.c
+6
-5
src/dnode/src/dnodeUtil.c
src/dnode/src/dnodeUtil.c
+25
-0
src/dnode/src/dnodeWrite.c
src/dnode/src/dnodeWrite.c
+14
-3
src/inc/http.h
src/inc/http.h
+8
-0
src/inc/taosmsg.h
src/inc/taosmsg.h
+19
-2
src/mnode/src/mgmtSystem.c
src/mnode/src/mgmtSystem.c
+1
-1
src/util/inc/tstatus.h
src/util/inc/tstatus.h
+2
-2
src/vnode/detail/inc/vnode.h
src/vnode/detail/inc/vnode.h
+1
-2
src/vnode/detail/src/vnodeRead.c
src/vnode/detail/src/vnodeRead.c
+3
-3
未找到文件。
src/client/src/tscSql.c
浏览文件 @
24190a8f
...
...
@@ -660,7 +660,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
// projection query on metric, pipeline retrieve data from vnode list,
// instead of two-stage merge
v
nodeProcessMsgFromShell free qhandle
// instead of two-stage merge
d
nodeProcessMsgFromShell free qhandle
nRows
=
taos_fetch_block_impl
(
res
,
rows
);
// current subclause is completed, try the next subclause
...
...
src/dnode/inc/dnodeMgmt.h
浏览文件 @
24190a8f
...
...
@@ -20,6 +20,8 @@
extern
"C"
{
#endif
#include <stdint.h>
#include <stdbool.h>
#include "tsched.h"
#include "dnode.h"
...
...
@@ -30,6 +32,9 @@ void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj
extern
void
*
dmQhandle
;
void
dnodeSendVpeerCfgMsg
(
int32_t
vnode
);
void
dnodeSendMeterCfgMsg
(
int32_t
vnode
,
int32_t
sid
);
#ifdef __cplusplus
}
#endif
...
...
src/dnode/inc/dnodeRead.h
浏览文件 @
24190a8f
...
...
@@ -20,12 +20,24 @@
extern
"C"
{
#endif
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "dnodeShell.h"
void
dnodeFreeQInfoInQueue
(
SShellObj
*
pShellObj
);
/*
* Dnode handle read messages
* The processing result is returned by callback function with pShellObj parameter
*/
int32_t
dnodeReadData
(
SQueryMeterMsg
*
msg
,
void
*
pShellObj
,
void
(
*
callback
)(
SQueryMeterRsp
*
rspMsg
,
void
*
pShellObj
));
typedef
void
(
*
SDnodeRetrieveCallbackFp
)(
int32_t
code
,
SRetrieveMeterRsp
*
pRetrieveRspMsg
,
void
*
pShellObj
);
void
dnodeRetrieveData
(
SRetrieveMeterMsg
*
pMsg
,
int32_t
msgLen
,
void
*
pShellObj
,
SDnodeRetrieveCallbackFp
callback
);
#ifdef __cplusplus
}
#endif
...
...
src/dnode/inc/dnodeShell.h
浏览文件 @
24190a8f
...
...
@@ -26,7 +26,6 @@ extern "C" {
typedef
struct
{
int
sid
;
int
vnode
;
uint32_t
ip
;
uint16_t
port
;
int32_t
count
;
// track the number of imports
...
...
@@ -38,6 +37,8 @@ typedef struct {
int32_t
dnodeInitShell
();
void
dnodeCleanupShell
();
//SDnodeStatisInfo dnodeGetStatisInfo()
#ifdef __cplusplus
...
...
src/dnode/inc/dnodeSystem.h
浏览文件 @
24190a8f
...
...
@@ -37,7 +37,8 @@ extern int32_t (*dnodeInitStorage)();
extern
void
(
*
dnodeCleanupStorage
)();
extern
void
(
*
dnodeParseParameterK
)();
extern
int32_t
tsMaxQueues
;
extern
void
**
tsRpcQhandle
;
extern
void
*
tsQueryQhandle
;
int32_t
dnodeInitSystem
();
void
dnodeCleanUpSystem
();
...
...
src/dnode/inc/dnodeUtil.h
0 → 100644
浏览文件 @
24190a8f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_UTIL_H
#define TDENGINE_DNODE_UTIL_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tstatus.h"
EVnodeStatus
dnodeGetVnodeStatus
(
int32_t
vnode
);
bool
dnodeCheckVnodeExist
(
int32_t
vnode
);
void
*
dnodeGetVnodeObj
(
int32_t
vnode
);
#ifdef __cplusplus
}
#endif
#endif
src/dnode/inc/dnodeWrite.h
浏览文件 @
24190a8f
...
...
@@ -26,11 +26,13 @@ extern "C" {
#include "taosmsg.h"
/*
* Write data based on dnode
* If >= 0, it is affect rows
* If < 0, get error code from terrno
* Write data based on dnode, the detail result can be fetched from rsponse
* pSubmitMsg: Data to be written
* pShellObj: Used to pass a communication handle
* callback: Pass the write result through a callback function, possibly in a different thread space
* rsp: will not be freed by callback function
*/
int32_t
dnodeWriteData
(
SShellSubmitMsg
*
msg
);
void
dnodeWriteData
(
SShellSubmitMsg
*
pMsg
,
void
*
pShellObj
,
void
(
*
callback
)(
SShellSubmitRspMsg
*
rsp
,
void
*
pShellObj
)
);
/*
* Check if table already exists
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
24190a8f
...
...
@@ -548,7 +548,7 @@ int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) {
return
0
;
}
void
vnodeSendVpeerCfgMsg
(
in
t
vnode
)
{
void
dnodeSendVpeerCfgMsg
(
int32_
t
vnode
)
{
char
*
pMsg
,
*
pStart
;
int
msgLen
;
SVpeerCfgMsg
*
pCfg
;
...
...
@@ -566,7 +566,7 @@ void vnodeSendVpeerCfgMsg(int vnode) {
taosSendMsgToMnode
(
pObj
,
pStart
,
msgLen
);
}
int
vnodeSendMeterCfgMsg
(
int
vnode
,
in
t
sid
)
{
void
dnodeSendMeterCfgMsg
(
int32_t
vnode
,
int32_
t
sid
)
{
char
*
pMsg
,
*
pStart
;
int
msgLen
;
SMeterCfgMsg
*
pCfg
;
...
...
src/dnode/src/dnodeRead.c
浏览文件 @
24190a8f
...
...
@@ -14,4 +14,48 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "dnodeWrite.h"
#include "dnode.h"
#include "dnodeRead.h"
#include "dnodeSystem.h"
void
dnodeFreeQInfoInQueue
(
SShellObj
*
pShellObj
)
{
}
void
dnodeExecuteRetrieveData
(
SSchedMsg
*
pSched
)
{
SRetrieveMeterMsg
*
pRetrieve
=
(
SRetrieveMeterMsg
*
)
pSched
->
msg
;
SDnodeRetrieveCallbackFp
callback
=
(
SDnodeRetrieveCallbackFp
)
pSched
->
thandle
;
SShellObj
*
pObj
=
(
SShellObj
*
)
pSched
->
ahandle
;
SRetrieveMeterRsp
result
=
{
0
};
/*
* in case of server restart, apps may hold qhandle created by server before restart,
* which is actually invalid, therefore, signature check is required.
*/
if
(
pRetrieve
->
qhandle
!=
(
uint64_t
)
pObj
->
qhandle
)
{
// if free flag is set, client wants to clean the resources
dError
(
"QInfo:%p, qhandle:%p is not matched with saved:%p"
,
pObj
->
qhandle
,
pRetrieve
->
qhandle
,
pObj
->
qhandle
);
int32_t
code
=
TSDB_CODE_INVALID_QHANDLE
;
(
*
callback
)(
code
,
&
result
,
pObj
);
}
//TODO build response here
free
(
pSched
->
msg
);
}
void
dnodeRetrieveData
(
SRetrieveMeterMsg
*
pMsg
,
int32_t
msgLen
,
void
*
pShellObj
,
SDnodeRetrieveCallbackFp
callback
)
{
int8_t
*
msg
=
malloc
(
msgLen
);
memcpy
(
msg
,
pMsg
,
msgLen
);
SSchedMsg
schedMsg
;
schedMsg
.
msg
=
msg
;
schedMsg
.
ahandle
=
pShellObj
;
schedMsg
.
thandle
=
callback
;
schedMsg
.
fp
=
dnodeExecuteRetrieveData
;
taosScheduleTask
(
tsQueryQhandle
,
&
schedMsg
);
}
src/dnode/src/dnodeShell.c
浏览文件 @
24190a8f
...
...
@@ -19,37 +19,33 @@
#include "taosdef.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tsocket.h"
#include "tschemautil.h"
#include "textbuffer.h"
#include "trpc.h"
#include "http.h"
#include "dnode.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeSystem.h"
#include "dnodeShell.h"
#include "dnodeUtil.h"
#include "dnodeWrite.h"
static
void
dnodeProcessRetrieveRequest
(
int8_t
*
pMsg
,
int32_t
msgLen
,
SShellObj
*
pObj
);
static
void
dnodeProcessQueryRequest
(
int8_t
*
pMsg
,
int32_t
msgLen
,
SShellObj
*
pObj
);
static
void
dnodeProcessShellSubmitRequest
(
int8_t
*
pMsg
,
int32_t
msgLen
,
SShellObj
*
pObj
);
static
void
*
tsDnodeShellServer
=
NULL
;
static
SShellObj
*
tsDnodeShellList
=
NULL
;
static
int32_t
tsDnodeSelectReqNum
=
0
;
static
int32_t
tsDnodeInsertReqNum
=
0
;
static
int32_t
tsDnodeShellConns
=
0
;
int32_t
vnodeProcessRetrieveRequest
(
char
*
pMsg
,
int
msgLen
,
SShellObj
*
pObj
);
int32_t
vnodeProcessQueryRequest
(
char
*
pMsg
,
int
msgLen
,
SShellObj
*
pObj
);
int32_t
vnodeProcessShellSubmitRequest
(
char
*
pMsg
,
int
msgLen
,
SShellObj
*
pObj
);
#define NUM_OF_SESSIONS_PER_VNODE (300)
#define NUM_OF_SESSIONS_PER_DNODE (NUM_OF_SESSIONS_PER_VNODE * TSDB_MAX_VNODES)
static
void
vnodeProcessBatchSubmitTimer
(
void
*
param
,
void
*
tmrId
);
static
void
*
pShellServer
=
NULL
;
static
SShellObj
**
shellList
=
NULL
;
static
int32_t
dnodeSelectReqNum
=
0
;
static
int32_t
dnodeInsertReqNum
=
0
;
typedef
struct
{
int32_t
import
;
int32_t
vnode
;
int32_t
numOfSid
;
int32_t
ssid
;
// Start sid
SShellObj
*
pObj
;
int64_t
offset
;
// offset relative the blks
char
blks
[];
}
SBatchSubmitInfo
;
void
*
vnodeProcessMsgFromShell
(
char
*
msg
,
void
*
ahandle
,
void
*
thandle
)
{
void
*
dnodeProcessMsgFromShell
(
char
*
msg
,
void
*
ahandle
,
void
*
thandle
)
{
int
sid
,
vnode
;
SShellObj
*
pObj
=
(
SShellObj
*
)
ahandle
;
SIntMsg
*
pMsg
=
(
SIntMsg
*
)
msg
;
...
...
@@ -61,11 +57,10 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
if
(
pObj
)
{
pObj
->
thandle
=
NULL
;
dTrace
(
"QInfo:%p %s free qhandle"
,
pObj
->
qhandle
,
__FUNCTION__
);
vnodeFreeQInfoInQueue
(
pObj
->
qhandle
);
dnodeFreeQInfoInQueue
(
pObj
);
pObj
->
qhandle
=
NULL
;
vnodeList
[
pObj
->
vnode
].
shellConns
--
;
dTrace
(
"vid:%d, shell connection:%d is gone, shellConns:%d"
,
pObj
->
vnode
,
pObj
->
sid
,
vnodeList
[
pObj
->
vnode
].
shellConns
);
tsDnodeShellConns
--
;
dTrace
(
"shell connection:%d is gone, shellConns:%d"
,
pObj
->
sid
,
tsDnodeShellConns
);
}
return
NULL
;
}
...
...
@@ -73,53 +68,34 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
taosGetRpcConnInfo
(
thandle
,
&
peerId
,
&
peerIp
,
&
peerPort
,
&
vnode
,
&
sid
);
if
(
pObj
==
NULL
)
{
if
(
shellList
[
vnode
])
{
pObj
=
shellList
[
vnode
]
+
sid
;
pObj
->
thandle
=
thandle
;
pObj
->
sid
=
sid
;
pObj
->
vnode
=
vnode
;
pObj
->
ip
=
peerIp
;
tinet_ntoa
(
ipstr
,
peerIp
);
vnodeList
[
pObj
->
vnode
].
shellConns
++
;
dTrace
(
"vid:%d, shell connection:%d from ip:%s is created, shellConns:%d"
,
vnode
,
sid
,
ipstr
,
vnodeList
[
pObj
->
vnode
].
shellConns
);
}
else
{
dError
(
"vid:%d, vnode not there, shell connection shall be closed"
,
vnode
);
return
NULL
;
}
pObj
=
tsDnodeShellList
+
sid
;
pObj
->
thandle
=
thandle
;
pObj
->
sid
=
sid
;
pObj
->
ip
=
peerIp
;
tinet_ntoa
(
ipstr
,
peerIp
);
tsDnodeShellConns
--
;
dTrace
(
"shell connection:%d from ip:%s is created, shellConns:%d"
,
sid
,
ipstr
,
tsDnodeShellConns
);
}
else
{
if
(
pObj
!=
shellList
[
vnode
]
+
sid
)
{
dError
(
"
vid:%d, shell connection:%d, pObj:%p is not matched with:%p"
,
vnode
,
sid
,
pObj
,
shellList
[
vnode
]
+
sid
);
if
(
pObj
!=
tsDnodeShellList
+
sid
)
{
dError
(
"
shell connection:%d, pObj:%p is not matched with:%p"
,
sid
,
pObj
,
tsDnodeShellList
+
sid
);
return
NULL
;
}
}
dTrace
(
"vid:%d sid:%d, msg:%s is received pConn:%p"
,
vnode
,
sid
,
taosMsg
[
pMsg
->
msgType
],
thandle
);
if
(
dnodeGetRunStatus
()
!=
TSDB_DNODE_RUN_STATUS_RUNING
)
{
taosSendSimpleRsp
(
thandle
,
pMsg
->
msgType
+
1
,
TSDB_CODE_NOT_READY
);
dTrace
(
"sid:%d, shell query msg is ignored since dnode not running"
,
sid
);
return
pObj
;
}
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_QUERY
)
{
if
(
vnodeList
[
vnode
].
vnodeStatus
==
TSDB_VN_STATUS_MASTER
||
vnodeList
[
vnode
].
vnodeStatus
==
TSDB_VN_STATUS_SLAVE
)
{
vnodeProcessQueryRequest
((
char
*
)
pMsg
->
content
,
pMsg
->
msgLen
-
sizeof
(
SIntMsg
),
pObj
);
}
else
{
taosSendSimpleRsp
(
thandle
,
pMsg
->
msgType
+
1
,
TSDB_CODE_NOT_READY
);
dTrace
(
"vid:%d sid:%d, shell query msg is ignored since in status:%s"
,
vnode
,
sid
,
taosGetVnodeStatusStr
(
vnodeList
[
vnode
].
vnodeStatus
));
}
dnodeProcessQueryRequest
(
pMsg
->
content
,
pMsg
->
msgLen
-
sizeof
(
SIntMsg
),
pObj
);
}
else
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_RETRIEVE
)
{
if
(
vnodeList
[
vnode
].
vnodeStatus
==
TSDB_VN_STATUS_MASTER
||
vnodeList
[
vnode
].
vnodeStatus
==
TSDB_VN_STATUS_SLAVE
)
{
vnodeProcessRetrieveRequest
((
char
*
)
pMsg
->
content
,
pMsg
->
msgLen
-
sizeof
(
SIntMsg
),
pObj
);
}
else
{
taosSendSimpleRsp
(
thandle
,
pMsg
->
msgType
+
1
,
TSDB_CODE_NOT_READY
);
dTrace
(
"vid:%d sid:%d, shell retrieve msg is ignored since in status:%s"
,
vnode
,
sid
,
taosGetVnodeStatusStr
(
vnodeList
[
vnode
].
vnodeStatus
));
}
dnodeProcessRetrieveRequest
(
pMsg
->
content
,
pMsg
->
msgLen
-
sizeof
(
SIntMsg
),
pObj
);
}
else
if
(
pMsg
->
msgType
==
TSDB_MSG_TYPE_SUBMIT
)
{
if
(
vnodeList
[
vnode
].
vnodeStatus
==
TSDB_VN_STATUS_MASTER
)
{
vnodeProcessShellSubmitRequest
((
char
*
)
pMsg
->
content
,
pMsg
->
msgLen
-
sizeof
(
SIntMsg
),
pObj
);
}
else
if
(
vnodeList
[
vnode
].
vnodeStatus
==
TSDB_VN_STATUS_SLAVE
)
{
taosSendSimpleRsp
(
thandle
,
pMsg
->
msgType
+
1
,
TSDB_CODE_REDIRECT
);
dTrace
(
"vid:%d sid:%d, shell submit msg is redirect since in status:%s"
,
vnode
,
sid
,
taosGetVnodeStatusStr
(
vnodeList
[
vnode
].
vnodeStatus
));
}
else
{
taosSendSimpleRsp
(
thandle
,
pMsg
->
msgType
+
1
,
TSDB_CODE_NOT_READY
);
dTrace
(
"vid:%d sid:%d, shell submit msg is ignored since in status:%s"
,
vnode
,
sid
,
taosGetVnodeStatusStr
(
vnodeList
[
vnode
].
vnodeStatus
));
}
dnodeProcessShellSubmitRequest
(
pMsg
->
content
,
pMsg
->
msgLen
-
sizeof
(
SIntMsg
),
pObj
);
}
else
{
dError
(
"%s is not processed"
,
taosMsg
[
pMsg
->
msgType
]);
}
...
...
@@ -128,17 +104,13 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
}
int32_t
dnodeInitShell
()
{
int
size
;
SRpcInit
rpcInit
;
size
=
TSDB_MAX_VNODES
*
sizeof
(
SShellObj
*
);
shellList
=
(
SShellObj
**
)
malloc
(
size
);
if
(
shellList
==
NULL
)
return
-
1
;
memset
(
shellList
,
0
,
size
);
int
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
numOfThreads
=
(
1
.
0
-
tsRatioOfQueryThreads
)
*
numOfThreads
/
2
.
0
;
if
(
numOfThreads
<
1
)
numOfThreads
=
1
;
if
(
numOfThreads
<
1
)
{
numOfThreads
=
1
;
}
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -147,92 +119,51 @@ int32_t dnodeInitShell() {
rpcInit
.
localPort
=
tsVnodeShellPort
;
rpcInit
.
label
=
"DND-shell"
;
rpcInit
.
numOfThreads
=
numOfThreads
;
rpcInit
.
fp
=
v
nodeProcessMsgFromShell
;
rpcInit
.
fp
=
d
nodeProcessMsgFromShell
;
rpcInit
.
bits
=
TSDB_SHELL_VNODE_BITS
;
rpcInit
.
numOfChanns
=
TSDB_MAX_VNODES
;
rpcInit
.
sessionsPerChann
=
16
;
rpcInit
.
idMgmt
=
TAOS_ID_FREE
;
rpcInit
.
connType
=
TAOS_CONN_SOCKET_TYPE_S
();
rpcInit
.
idleTime
=
tsShellActivityTimer
*
2000
;
rpcInit
.
qhandle
=
r
pcQhandle
[
0
];
rpcInit
.
efp
=
vnodeSendVpeerCfgMsg
;
rpcInit
.
qhandle
=
tsR
pcQhandle
[
0
];
//
rpcInit.efp = vnodeSendVpeerCfgMsg;
p
ShellServer
=
taosOpenRpc
(
&
rpcInit
);
if
(
p
ShellServer
==
NULL
)
{
tsDnode
ShellServer
=
taosOpenRpc
(
&
rpcInit
);
if
(
tsDnode
ShellServer
==
NULL
)
{
dError
(
"failed to init connection to shell"
);
return
-
1
;
}
return
0
;
}
int
vnodeOpenShellVnode
(
int
vnode
)
{
if
(
shellList
[
vnode
]
!=
NULL
)
{
dError
(
"vid:%d, shell is already opened"
,
vnode
);
return
-
1
;
}
const
int32_t
MIN_NUM_OF_SESSIONS
=
300
;
SVnodeCfg
*
pCfg
=
&
vnodeList
[
vnode
].
cfg
;
int32_t
sessions
=
(
int32_t
)
MAX
(
pCfg
->
maxSessions
*
1
.
1
,
MIN_NUM_OF_SESSIONS
);
size_t
size
=
sessions
*
sizeof
(
SShellObj
);
shellList
[
vnode
]
=
(
SShellObj
*
)
calloc
(
1
,
size
);
if
(
shellList
[
vnode
]
==
NULL
)
{
dError
(
"vid:%d, sessions:%d, failed to allocate shellObj, size:%d"
,
vnode
,
pCfg
->
maxSessions
,
size
);
const
int32_t
size
=
NUM_OF_SESSIONS_PER_DNODE
*
sizeof
(
SShellObj
);
tsDnodeShellList
=
(
SShellObj
*
)
malloc
(
size
);
if
(
tsDnodeShellList
==
NULL
)
{
dError
(
"failed to allocate shellObj, sessions:%d"
,
NUM_OF_SESSIONS_PER_DNODE
);
return
-
1
;
}
memset
(
tsDnodeShellList
,
0
,
size
);
if
(
taosOpenRpcChannWithQ
(
pShellServer
,
vnode
,
sessions
,
rpcQhandle
[(
vnode
+
1
)
%
tsMaxQueues
])
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"vid:%d, sessions:%d, failed to open shell"
,
vnode
,
pCfg
->
maxSessions
);
// TODO re initialize tsRpcQhandle
if
(
taosOpenRpcChannWithQ
(
tsDnodeShellServer
,
0
,
NUM_OF_SESSIONS_PER_DNODE
,
tsRpcQhandle
)
!=
TSDB_CODE_SUCCESS
)
{
dError
(
"sessions:%d, failed to open shell"
,
NUM_OF_SESSIONS_PER_DNODE
);
return
-
1
;
}
d
Print
(
"vid:%d, sessions:%d, shell is opened"
,
vnode
,
pCfg
->
maxSessions
);
d
Error
(
"sessions:%d, shell is opened"
,
NUM_OF_SESSIONS_PER_DNODE
);
return
TSDB_CODE_SUCCESS
;
}
static
void
vnodeDelayedFreeResource
(
void
*
param
,
void
*
tmrId
)
{
int32_t
vnode
=
*
(
int32_t
*
)
param
;
dTrace
(
"vid:%d, start to free resources for 500ms arrived"
,
vnode
);
taosCloseRpcChann
(
pShellServer
,
vnode
);
// close connection
tfree
(
shellList
[
vnode
]);
//free SShellObj
tfree
(
param
);
memset
(
vnodeList
+
vnode
,
0
,
sizeof
(
SVnodeObj
));
dTrace
(
"vid:%d, status set to %s"
,
vnode
,
taosGetVnodeStatusStr
(
vnodeList
[
vnode
].
vnodeStatus
));
vnodeCalcOpenVnodes
();
}
void
vnodeCloseShellVnode
(
int
vnode
)
{
if
(
shellList
[
vnode
]
==
NULL
)
return
;
for
(
int
i
=
0
;
i
<
vnodeList
[
vnode
].
cfg
.
maxSessions
;
++
i
)
{
void
*
qhandle
=
shellList
[
vnode
][
i
].
qhandle
;
if
(
qhandle
!=
NULL
)
{
vnodeDecRefCount
(
qhandle
);
}
void
dnodeCleanupShell
()
{
if
(
tsDnodeShellServer
)
{
taosCloseRpc
(
tsDnodeShellServer
);
}
int32_t
*
v
=
malloc
(
sizeof
(
int32_t
));
*
v
=
vnode
;
/*
* free the connection related resource after 5sec.
* 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash
* 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access.
*/
dTrace
(
"vid:%d, free resources in 500ms"
,
vnode
);
taosTmrStart
(
vnodeDelayedFreeResource
,
500
,
v
,
vnodeTmrCtrl
);
}
void
vnodeCleanUpShell
()
{
if
(
pShellServer
)
taosCloseRpc
(
pShellServer
);
for
(
int
i
=
0
;
i
<
NUM_OF_SESSIONS_PER_DNODE
;
++
i
)
{
dnodeFreeQInfoInQueue
(
tsDnodeShellList
+
i
);
}
tfree
(
s
hellList
);
//tfree(tsDnodeS
hellList);
}
int
vnodeSendQueryRspMsg
(
SShellObj
*
pObj
,
int
code
,
void
*
qhandle
)
{
...
...
@@ -255,7 +186,7 @@ int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) {
return
msgLen
;
}
int
vnodeSendShellSubmitRspMsg
(
SShellObj
*
pObj
,
int
code
,
in
t
numOfPoints
)
{
int
32_t
dnodeSendShellSubmitRspMsg
(
SShellObj
*
pObj
,
int32_t
code
,
int32_
t
numOfPoints
)
{
char
*
pMsg
,
*
pStart
;
int
msgLen
;
...
...
@@ -395,344 +326,73 @@ _query_over:
vnodeFreeColumnInfo
(
&
pQueryMsg
->
colList
[
i
]);
}
atomic_fetch_add_32
(
&
d
nodeSelectReqNum
,
1
);
atomic_fetch_add_32
(
&
tsD
nodeSelectReqNum
,
1
);
return
ret
;
}
void
vnodeExecuteRetrieveReq
(
SSchedMsg
*
pSched
)
{
char
*
pMsg
=
pSched
->
msg
;
int
msgLen
;
SShellObj
*
pObj
=
(
SShellObj
*
)
pSched
->
ahandle
;
SRetrieveMeterMsg
*
pRetrieve
;
SRetrieveMeterRsp
*
pRsp
;
int
numOfRows
=
0
,
rowSize
=
0
,
size
=
0
;
int16_t
timePrec
=
TSDB_TIME_PRECISION_MILLI
;
char
*
pStart
;
int
code
=
0
;
pRetrieve
=
(
SRetrieveMeterMsg
*
)
pMsg
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
pRetrieve
->
qhandle
;
pRetrieve
->
free
=
htons
(
pRetrieve
->
free
);
if
((
pRetrieve
->
free
&
TSDB_QUERY_TYPE_FREE_RESOURCE
)
!=
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
dTrace
(
"retrieve msg, handle:%p, free:%d"
,
pRetrieve
->
qhandle
,
pRetrieve
->
free
);
}
else
{
dTrace
(
"retrieve msg to free resource from client, handle:%p, free:%d"
,
pRetrieve
->
qhandle
,
pRetrieve
->
free
);
}
/*
* in case of server restart, apps may hold qhandle created by server before restart,
* which is actually invalid, therefore, signature check is required.
*/
if
(
pRetrieve
->
qhandle
==
(
uint64_t
)
pObj
->
qhandle
)
{
// if free flag is set, client wants to clean the resources
if
((
pRetrieve
->
free
&
TSDB_QUERY_TYPE_FREE_RESOURCE
)
!=
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
code
=
vnodeRetrieveQueryInfo
((
void
*
)(
pRetrieve
->
qhandle
),
&
numOfRows
,
&
rowSize
,
&
timePrec
);
}
}
else
{
dError
(
"QInfo:%p, qhandle:%p is not matched with saved:%p"
,
pObj
->
qhandle
,
pRetrieve
->
qhandle
,
pObj
->
qhandle
);
code
=
TSDB_CODE_INVALID_QHANDLE
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
size
=
vnodeGetResultSize
((
void
*
)(
pRetrieve
->
qhandle
),
&
numOfRows
);
// buffer size for progress information, including meter count,
// and for each meter, including 'uid' and 'TSKEY'.
int
progressSize
=
0
;
if
(
pQInfo
->
pMeterQuerySupporter
!=
NULL
)
progressSize
=
pQInfo
->
pMeterQuerySupporter
->
numOfMeters
*
(
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
))
+
sizeof
(
int32_t
);
else
if
(
pQInfo
->
pObj
!=
NULL
)
progressSize
=
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
)
+
sizeof
(
int32_t
);
pStart
=
taosBuildRspMsgWithSize
(
pObj
->
thandle
,
TSDB_MSG_TYPE_RETRIEVE_RSP
,
progressSize
+
size
+
100
);
if
(
pStart
==
NULL
)
{
taosSendSimpleRsp
(
pObj
->
thandle
,
TSDB_MSG_TYPE_RETRIEVE_RSP
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
goto
_exit
;
}
}
pMsg
=
pStart
;
*
pMsg
=
code
;
pMsg
++
;
pRsp
=
(
SRetrieveMeterRsp
*
)
pMsg
;
pRsp
->
numOfRows
=
htonl
(
numOfRows
);
pRsp
->
precision
=
htons
(
timePrec
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pRsp
->
offset
=
htobe64
(
vnodeGetOffsetVal
((
void
*
)
pRetrieve
->
qhandle
));
pRsp
->
useconds
=
htobe64
(((
SQInfo
*
)(
pRetrieve
->
qhandle
))
->
useconds
);
}
else
{
pRsp
->
offset
=
0
;
pRsp
->
useconds
=
0
;
}
pMsg
=
pRsp
->
data
;
if
(
numOfRows
>
0
&&
code
==
TSDB_CODE_SUCCESS
)
{
vnodeSaveQueryResult
((
void
*
)(
pRetrieve
->
qhandle
),
pRsp
->
data
,
&
size
);
}
pMsg
+=
size
;
// write the progress information of each meter to response
// this is required by subscriptions
if
(
numOfRows
>
0
&&
code
==
TSDB_CODE_SUCCESS
)
{
if
(
pQInfo
->
pMeterQuerySupporter
!=
NULL
&&
pQInfo
->
pMeterQuerySupporter
->
pMeterSidExtInfo
!=
NULL
)
{
*
((
int32_t
*
)
pMsg
)
=
htonl
(
pQInfo
->
pMeterQuerySupporter
->
numOfMeters
);
pMsg
+=
sizeof
(
int32_t
);
for
(
int32_t
i
=
0
;
i
<
pQInfo
->
pMeterQuerySupporter
->
numOfMeters
;
i
++
)
{
*
((
int64_t
*
)
pMsg
)
=
htobe64
(
pQInfo
->
pMeterQuerySupporter
->
pMeterSidExtInfo
[
i
]
->
uid
);
pMsg
+=
sizeof
(
int64_t
);
*
((
TSKEY
*
)
pMsg
)
=
htobe64
(
pQInfo
->
pMeterQuerySupporter
->
pMeterSidExtInfo
[
i
]
->
key
);
pMsg
+=
sizeof
(
TSKEY
);
}
}
else
if
(
pQInfo
->
pObj
!=
NULL
)
{
*
((
int32_t
*
)
pMsg
)
=
htonl
(
1
);
pMsg
+=
sizeof
(
int32_t
);
*
((
int64_t
*
)
pMsg
)
=
htobe64
(
pQInfo
->
pObj
->
uid
);
pMsg
+=
sizeof
(
int64_t
);
if
(
pQInfo
->
pointsRead
>
0
)
{
*
((
TSKEY
*
)
pMsg
)
=
htobe64
(
pQInfo
->
query
.
lastKey
+
1
);
}
else
{
*
((
TSKEY
*
)
pMsg
)
=
htobe64
(
pQInfo
->
query
.
lastKey
);
}
pMsg
+=
sizeof
(
TSKEY
);
}
}
msgLen
=
pMsg
-
pStart
;
assert
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
);
if
(
numOfRows
==
0
&&
(
pRetrieve
->
qhandle
==
(
uint64_t
)
pObj
->
qhandle
)
&&
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
&&
pRetrieve
->
qhandle
!=
0
)
{
dTrace
(
"QInfo:%p %s free qhandle code:%d"
,
pObj
->
qhandle
,
__FUNCTION__
,
code
);
vnodeDecRefCount
(
pObj
->
qhandle
);
pObj
->
qhandle
=
NULL
;
}
taosSendMsgToPeer
(
pObj
->
thandle
,
pStart
,
msgLen
);
_exit:
free
(
pSched
->
msg
);
}
int
vnodeProcessRetrieveRequest
(
char
*
pMsg
,
int
msgLen
,
SShellObj
*
pObj
)
{
SSchedMsg
schedMsg
;
char
*
msg
=
malloc
(
msgLen
);
memcpy
(
msg
,
pMsg
,
msgLen
);
schedMsg
.
msg
=
msg
;
schedMsg
.
ahandle
=
pObj
;
schedMsg
.
fp
=
vnodeExecuteRetrieveReq
;
taosScheduleTask
(
queryQhandle
,
&
schedMsg
);
return
msgLen
;
}
static
int
vnodeCheckSubmitBlockContext
(
SShellSubmitBlock
*
pBlocks
,
SVnodeObj
*
pVnode
)
{
int32_t
sid
=
htonl
(
pBlocks
->
sid
);
uint64_t
uid
=
htobe64
(
pBlocks
->
uid
);
if
(
sid
>=
pVnode
->
cfg
.
maxSessions
||
sid
<=
0
)
{
dError
(
"vid:%d sid:%d, sid is out of range"
,
pVnode
->
vnode
,
sid
);
return
TSDB_CODE_INVALID_TABLE_ID
;
}
SMeterObj
*
pMeterObj
=
pVnode
->
meterList
[
sid
];
if
(
pMeterObj
==
NULL
)
{
dError
(
"vid:%d sid:%d, not active table"
,
pVnode
->
vnode
,
sid
);
vnodeSendMeterCfgMsg
(
pVnode
->
vnode
,
sid
);
return
TSDB_CODE_NOT_ACTIVE_TABLE
;
void
dnodeProcessRetrieveRequestCb
(
int
code
,
SRetrieveMeterRsp
*
result
,
SShellObj
*
pObj
)
{
if
(
pObj
==
NULL
||
result
==
NULL
||
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
return
;
}
if
(
pMeterObj
->
uid
!=
uid
)
{
dError
(
"vid:%d sid:%d id:%s, uid:%"
PRIu64
", uid in msg:%"
PRIu64
", uid mismatch"
,
pVnode
->
vnode
,
sid
,
pMeterObj
->
meterId
,
pMeterObj
->
uid
,
uid
);
return
TSDB_CODE_INVALID_SUBMIT_MSG
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int
vnodeDoSubmitJob
(
SVnodeObj
*
pVnode
,
int
import
,
int32_t
*
ssid
,
int32_t
esid
,
SShellSubmitBlock
**
ppBlocks
,
TSKEY
now
,
SShellObj
*
pObj
)
{
SShellSubmitBlock
*
pBlocks
=
*
ppBlocks
;
int
code
=
TSDB_CODE_SUCCESS
;
int32_t
numOfPoints
=
0
;
int32_t
i
=
0
;
SShellSubmitBlock
tBlock
;
for
(
i
=
*
ssid
;
i
<
esid
;
i
++
)
{
numOfPoints
=
0
;
tBlock
=
*
pBlocks
;
code
=
vnodeCheckSubmitBlockContext
(
pBlocks
,
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
break
;
SMeterObj
*
pMeterObj
=
(
SMeterObj
*
)(
pVnode
->
meterList
[
htonl
(
pBlocks
->
sid
)]);
// dont include sid, vid
int32_t
subMsgLen
=
sizeof
(
pBlocks
->
numOfRows
)
+
htons
(
pBlocks
->
numOfRows
)
*
pMeterObj
->
bytesPerPoint
;
int32_t
sversion
=
htonl
(
pBlocks
->
sversion
);
if
(
import
)
{
code
=
vnodeImportPoints
(
pMeterObj
,
(
char
*
)
&
(
pBlocks
->
numOfRows
),
subMsgLen
,
TSDB_DATA_SOURCE_SHELL
,
pObj
,
sversion
,
&
numOfPoints
,
now
);
pObj
->
numOfTotalPoints
+=
numOfPoints
;
// records for one table should be consecutive located in the payload buffer, which is guaranteed by client
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pObj
->
count
--
;
}
}
else
{
code
=
vnodeInsertPoints
(
pMeterObj
,
(
char
*
)
&
(
pBlocks
->
numOfRows
),
subMsgLen
,
TSDB_DATA_SOURCE_SHELL
,
NULL
,
sversion
,
&
numOfPoints
,
now
);
pObj
->
numOfTotalPoints
+=
numOfPoints
;
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
break
;
pBlocks
=
(
SShellSubmitBlock
*
)((
char
*
)
pBlocks
+
sizeof
(
SShellSubmitBlock
)
+
htons
(
pBlocks
->
numOfRows
)
*
pMeterObj
->
bytesPerPoint
);
}
*
ssid
=
i
;
*
ppBlocks
=
pBlocks
;
/* Since the pBlock part can be changed by the vnodeForwardToPeer interface,
* which is also possible to be used again. For that case, we just copy the original
* block content back.
*/
if
(
import
&&
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
))
{
memcpy
((
void
*
)
pBlocks
,
(
void
*
)
&
tBlock
,
sizeof
(
SShellSubmitBlock
));
}
return
code
;
static
void
dnodeProcessRetrieveRequest
(
int8_t
*
pMsg
,
int32_t
msgLen
,
SShellObj
*
pObj
)
{
SRetrieveMeterMsg
*
pRetrieve
=
(
SRetrieveMeterMsg
*
)
pMsg
;
dnodeRetrieveData
(
pRetrieve
,
msgLen
,
pObj
,
dnodeProcessRetrieveRequestCb
);
}
int
vnodeProcessShellSubmitRequest
(
char
*
pMsg
,
int
msgLen
,
SShellObj
*
pObj
)
{
int
code
=
0
,
ret
=
0
;
int32_t
i
=
0
;
SShellSubmitMsg
shellSubmit
=
*
(
SShellSubmitMsg
*
)
pMsg
;
SShellSubmitMsg
*
pSubmit
=
&
shellSubmit
;
SShellSubmitBlock
*
pBlocks
=
NULL
;
pSubmit
->
import
=
htons
(
pSubmit
->
import
);
pSubmit
->
vnode
=
htons
(
pSubmit
->
vnode
);
pSubmit
->
numOfSid
=
htonl
(
pSubmit
->
numOfSid
);
if
(
pSubmit
->
numOfSid
<=
0
)
{
dError
(
"invalid num of meters:%d"
,
pSubmit
->
numOfSid
);
code
=
TSDB_CODE_INVALID_QUERY_MSG
;
goto
_submit_over
;
void
dnodeProcessShellSubmitRequestCb
(
SShellSubmitRspMsg
*
result
,
void
*
pObj
)
{
if
(
pObj
==
NULL
||
result
==
NULL
||
result
->
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
return
;
}
if
(
pSubmit
->
vnode
>=
TSDB_MAX_VNODES
||
pSubmit
->
vnode
<
0
)
{
dTrace
(
"vnode:%d is out of range"
,
pSubmit
->
vnode
);
code
=
TSDB_CODE_INVALID_VNODE_ID
;
goto
_submit_over
;
SShellObj
*
pShellObj
=
(
SShellObj
*
)
pObj
;
int32_t
msgLen
=
sizeof
(
SShellSubmitRspMsg
)
+
result
->
numOfFailedBlocks
*
sizeof
(
SShellSubmitRspBlock
);
SShellSubmitRspMsg
*
submitRsp
=
(
SShellSubmitRspMsg
*
)
taosBuildRspMsgWithSize
(
pShellObj
->
thandle
,
TSDB_MSG_TYPE_SUBMIT_RSP
,
msgLen
);
if
(
submitRsp
==
NULL
)
{
return
;
}
SVnodeObj
*
pVnode
=
vnodeList
+
pSubmit
->
vnode
;
if
(
pVnode
->
cfg
.
maxSessions
==
0
||
pVnode
->
meterList
==
NULL
)
{
dError
(
"vid:%d is not activated for submit"
,
pSubmit
->
vnode
);
vnodeSendVpeerCfgMsg
(
pSubmit
->
vnode
);
code
=
TSDB_CODE_NOT_ACTIVE_VNODE
;
goto
_submit_over
;
}
dTrace
(
"code:%d, numOfRows:%d affectedRows:%d"
,
result
->
code
,
result
->
numOfRows
,
result
->
affectedRows
);
memcpy
(
submitRsp
,
result
,
msgLen
);
if
(
!
(
pVnode
->
accessState
&
TSDB_VN_WRITE_ACCCESS
))
{
code
=
TSDB_CODE_NO_WRITE_ACCESS
;
goto
_submit_over
;
}
if
(
tsAvailDataDirGB
<
tsMinimalDataDirGB
)
{
dError
(
"server disk space remain %.3f GB, need at least %.3f GB, stop writing"
,
tsAvailDataDirGB
,
tsMinimalDataDirGB
);
code
=
TSDB_CODE_SERV_NO_DISKSPACE
;
goto
_submit_over
;
}
pObj
->
count
=
pSubmit
->
numOfSid
;
// for import
pObj
->
code
=
0
;
// for import
pObj
->
numOfTotalPoints
=
0
;
TSKEY
now
=
taosGetTimestamp
(
pVnode
->
cfg
.
precision
);
pBlocks
=
(
SShellSubmitBlock
*
)(
pMsg
+
sizeof
(
SShellSubmitMsg
));
i
=
0
;
code
=
vnodeDoSubmitJob
(
pVnode
,
pSubmit
->
import
,
&
i
,
pSubmit
->
numOfSid
,
&
pBlocks
,
now
,
pObj
);
_submit_over:
ret
=
0
;
if
(
pSubmit
->
import
)
{
// Import case
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
SBatchSubmitInfo
*
pSubmitInfo
=
(
SBatchSubmitInfo
*
)
calloc
(
1
,
sizeof
(
SBatchSubmitInfo
)
+
msgLen
-
sizeof
(
SShellSubmitMsg
));
if
(
pSubmitInfo
==
NULL
)
{
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
ret
=
vnodeSendShellSubmitRspMsg
(
pObj
,
code
,
pObj
->
numOfTotalPoints
);
}
else
{
// Start a timer to process the next part of request
pSubmitInfo
->
import
=
1
;
pSubmitInfo
->
vnode
=
pSubmit
->
vnode
;
pSubmitInfo
->
numOfSid
=
pSubmit
->
numOfSid
;
pSubmitInfo
->
ssid
=
i
;
// start from this position, not the initial position
pSubmitInfo
->
pObj
=
pObj
;
pSubmitInfo
->
offset
=
((
char
*
)
pBlocks
)
-
(
pMsg
+
sizeof
(
SShellSubmitMsg
));
assert
(
pSubmitInfo
->
offset
>=
0
);
memcpy
((
void
*
)(
pSubmitInfo
->
blks
),
(
void
*
)(
pMsg
+
sizeof
(
SShellSubmitMsg
)),
msgLen
-
sizeof
(
SShellSubmitMsg
));
taosTmrStart
(
vnodeProcessBatchSubmitTimer
,
10
,
(
void
*
)
pSubmitInfo
,
vnodeTmrCtrl
);
}
}
else
{
if
(
code
==
TSDB_CODE_SUCCESS
)
assert
(
pObj
->
count
==
0
);
ret
=
vnodeSendShellSubmitRspMsg
(
pObj
,
code
,
pObj
->
numOfTotalPoints
);
for
(
int
i
=
0
;
i
<
submitRsp
->
numOfFailedBlocks
;
++
i
)
{
SShellSubmitRspBlock
*
block
=
&
submitRsp
->
failedBlocks
[
i
];
if
(
block
->
code
==
TSDB_CODE_NOT_ACTIVE_VNODE
||
block
->
code
==
TSDB_CODE_INVALID_VNODE_ID
)
{
dnodeSendVpeerCfgMsg
(
block
->
vnode
);
}
else
if
(
block
->
code
==
TSDB_CODE_INVALID_TABLE_ID
||
block
->
code
==
TSDB_CODE_NOT_ACTIVE_TABLE
)
{
dnodeSendMeterCfgMsg
(
block
->
vnode
,
block
->
sid
);
}
}
else
{
// Insert case
ret
=
vnodeSendShellSubmitRspMsg
(
pObj
,
code
,
pObj
->
numOfTotalPoints
);
block
->
vnode
=
htonl
(
block
->
vnode
);
block
->
sid
=
htonl
(
block
->
sid
);
block
->
code
=
htonl
(
block
->
code
);
}
submitRsp
->
code
=
htonl
(
submitRsp
->
code
);
submitRsp
->
numOfRows
=
htonl
(
submitRsp
->
numOfRows
);
submitRsp
->
affectedRows
=
htonl
(
submitRsp
->
affectedRows
);
submitRsp
->
failedRows
=
htonl
(
submitRsp
->
failedRows
);
submitRsp
->
numOfFailedBlocks
=
htonl
(
submitRsp
->
numOfFailedBlocks
);
atomic_fetch_add_32
(
&
dnodeInsertReqNum
,
1
);
return
ret
;
taosSendMsgToPeer
(
pShellObj
->
thandle
,
(
int8_t
*
)
submitRsp
,
msgLen
);
}
static
void
vnodeProcessBatchSubmitTimer
(
void
*
param
,
void
*
tmrId
)
{
SBatchSubmitInfo
*
pSubmitInfo
=
(
SBatchSubmitInfo
*
)
param
;
assert
(
pSubmitInfo
!=
NULL
&&
pSubmitInfo
->
import
);
int32_t
i
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SShellObj
*
pShell
=
pSubmitInfo
->
pObj
;
SVnodeObj
*
pVnode
=
&
vnodeList
[
pSubmitInfo
->
vnode
];
SShellSubmitBlock
*
pBlocks
=
(
SShellSubmitBlock
*
)(
pSubmitInfo
->
blks
+
pSubmitInfo
->
offset
);
TSKEY
now
=
taosGetTimestamp
(
pVnode
->
cfg
.
precision
);
i
=
pSubmitInfo
->
ssid
;
code
=
vnodeDoSubmitJob
(
pVnode
,
pSubmitInfo
->
import
,
&
i
,
pSubmitInfo
->
numOfSid
,
&
pBlocks
,
now
,
pShell
);
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
pSubmitInfo
->
ssid
=
i
;
pSubmitInfo
->
offset
=
((
char
*
)
pBlocks
)
-
pSubmitInfo
->
blks
;
taosTmrStart
(
vnodeProcessBatchSubmitTimer
,
10
,
(
void
*
)
pSubmitInfo
,
vnodeTmrCtrl
);
}
else
{
if
(
code
==
TSDB_CODE_SUCCESS
)
assert
(
pShell
->
count
==
0
);
tfree
(
param
);
vnodeSendShellSubmitRspMsg
(
pShell
,
code
,
pShell
->
numOfTotalPoints
);
}
static
void
dnodeProcessShellSubmitRequest
(
int8_t
*
pMsg
,
int32_t
msgLen
,
SShellObj
*
pObj
)
{
SShellSubmitMsg
*
pSubmit
=
(
SShellSubmitMsg
*
)
pMsg
;
dnodeWriteData
(
pSubmit
,
pObj
,
dnodeProcessShellSubmitRequestCb
);
atomic_fetch_add_32
(
&
tsDnodeInsertReqNum
,
1
);
}
SDnodeStatisInfo
dnodeGetStatisInfo
()
{
SDnodeStatisInfo
info
=
{
0
};
if
(
dnodeGetRunStatus
()
==
TSDB_DNODE_RUN_STATUS_RUNING
)
{
info
.
httpReqNum
=
httpGetReqCount
();
info
.
selectReqNum
=
atomic_exchange_32
(
&
d
nodeSelectReqNum
,
0
);
info
.
insertReqNum
=
atomic_exchange_32
(
&
d
nodeInsertReqNum
,
0
);
info
.
httpReqNum
=
httpGetReqCount
();
info
.
selectReqNum
=
atomic_exchange_32
(
&
tsD
nodeSelectReqNum
,
0
);
info
.
insertReqNum
=
atomic_exchange_32
(
&
tsD
nodeInsertReqNum
,
0
);
}
return
info
;
...
...
src/dnode/src/dnodeSystem.c
浏览文件 @
24190a8f
...
...
@@ -52,9 +52,9 @@ static int32_t dnodeInitTmrCtl();
void
*
tsStatusTimer
=
NULL
;
void
*
vnodeTmrCtrl
;
void
**
r
pcQhandle
;
void
**
tsR
pcQhandle
;
void
*
dmQhandle
;
void
*
q
ueryQhandle
;
void
*
tsQ
ueryQhandle
;
int32_t
tsVnodePeers
=
TSDB_VNODES_SUPPORT
-
1
;
int32_t
tsMaxQueues
;
uint32_t
tsRebootTime
;
...
...
@@ -95,6 +95,7 @@ void dnodeCleanUpSystem() {
tsStatusTimer
=
NULL
;
}
dnodeCleanupShell
();
dnodeCleanUpModules
();
dnodeCleanupVnodes
();
taosCloseLogger
();
...
...
@@ -269,7 +270,7 @@ static int32_t dnodeInitQueryQHandle() {
int32_t
maxQueueSize
=
tsNumOfVnodesPerCore
*
tsNumOfCores
*
tsSessionsPerVnode
;
dTrace
(
"query task queue initialized, max slot:%d, task threads:%d"
,
maxQueueSize
,
numOfThreads
);
q
ueryQhandle
=
taosInitSchedulerWithInfo
(
maxQueueSize
,
numOfThreads
,
"query"
,
vnodeTmrCtrl
);
tsQ
ueryQhandle
=
taosInitSchedulerWithInfo
(
maxQueueSize
,
numOfThreads
,
"query"
,
vnodeTmrCtrl
);
return
0
;
}
...
...
@@ -291,10 +292,10 @@ static int32_t dnodeInitRpcQHandle() {
tsMaxQueues
=
1
;
}
r
pcQhandle
=
malloc
(
tsMaxQueues
*
sizeof
(
void
*
));
tsR
pcQhandle
=
malloc
(
tsMaxQueues
*
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
tsMaxQueues
;
++
i
)
{
r
pcQhandle
[
i
]
=
taosInitScheduler
(
tsSessionsPerVnode
,
1
,
"dnode"
);
tsR
pcQhandle
[
i
]
=
taosInitScheduler
(
tsSessionsPerVnode
,
1
,
"dnode"
);
}
dmQhandle
=
taosInitScheduler
(
tsSessionsPerVnode
,
1
,
"mgmt"
);
...
...
src/dnode/src/dnodeUtil.c
0 → 100644
浏览文件 @
24190a8f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeUtil.h"
EVnodeStatus
dnodeGetVnodeStatus
(
int32_t
vnode
)
{
return
TSDB_VN_STATUS_MASTER
;
}
bool
dnodeCheckVnodeExist
(
int32_t
vnode
)
{
return
true
;
}
src/dnode/src/dnodeWrite.c
浏览文件 @
24190a8f
...
...
@@ -14,15 +14,26 @@
*/
#define _DEFAULT_SOURCE
#include "
dnodeWrite
.h"
#include "
os
.h"
#include "taoserror.h"
#include "tlog.h"
#include "dnodeWrite.h"
int32_t
dnodeCheckTableExist
(
char
*
tableId
)
{
return
0
;
}
int32_t
dnodeWriteData
(
SShellSubmitMsg
*
msg
)
{
return
0
;
void
dnodeWriteData
(
SShellSubmitMsg
*
pSubmit
,
void
*
pShellObj
,
void
(
*
callback
)(
SShellSubmitRspMsg
*
,
void
*
))
{
SShellSubmitRspMsg
result
=
{
0
};
int32_t
numOfSid
=
htonl
(
pSubmit
->
numOfSid
);
if
(
numOfSid
<=
0
)
{
dError
(
"invalid num of tables:%d"
,
numOfSid
);
result
.
code
=
TSDB_CODE_INVALID_QUERY_MSG
;
callback
(
&
result
,
pShellObj
);
}
//TODO: submit implementation
}
int32_t
dnodeCreateNormalTable
(
SCreateNormalTableMsg
*
table
)
{
...
...
src/inc/http.h
浏览文件 @
24190a8f
...
...
@@ -16,6 +16,10 @@
#ifndef TDENGINE_HTTP_H
#define TDENGINE_HTTP_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "tglobalcfg.h"
#include "tlog.h"
...
...
@@ -44,4 +48,8 @@
int32_t
httpGetReqCount
();
#ifdef __cplusplus
}
#endif
#endif
src/inc/taosmsg.h
浏览文件 @
24190a8f
...
...
@@ -271,6 +271,7 @@ typedef struct {
}
SSubmitMsg
;
typedef
struct
{
int32_t
vnode
;
int32_t
sid
;
int32_t
sversion
;
uint64_t
uid
;
...
...
@@ -279,12 +280,28 @@ typedef struct {
}
SShellSubmitBlock
;
typedef
struct
{
short
import
;
short
vnode
;
int8_t
import
;
int8_t
reserved
[
3
]
;
int32_t
numOfSid
;
/* total number of sid */
char
blks
[];
/* numOfSid blocks, each blocks for one meter */
}
SShellSubmitMsg
;
typedef
struct
{
int32_t
vnode
;
// vnode index of failed block
int32_t
sid
;
// table index of failed block
int32_t
code
;
// errorcode while write data to vnode, such as not created, dropped, no space, invalid table
}
SShellSubmitRspBlock
;
typedef
struct
{
int32_t
code
;
// 0-success, 1-inprogress, > 1 error code
int32_t
numOfRows
;
// number of records the client is trying to write
int32_t
affectedRows
;
// number of records actually written
int32_t
failedRows
;
// number of failed records (exclude duplicate records)
int32_t
numOfFailedBlocks
;
SShellSubmitRspBlock
*
failedBlocks
;
}
SShellSubmitRspMsg
;
typedef
struct
SSchema
{
uint8_t
type
;
char
name
[
TSDB_COL_NAME_LEN
];
...
...
src/mnode/src/mgmtSystem.c
浏览文件 @
24190a8f
...
...
@@ -40,7 +40,7 @@ void * mgmtStatisticTimer = NULL;
int
mgmtShellConns
=
0
;
int
mgmtDnodeConns
=
0
;
extern
void
*
pShellConn
;
extern
void
**
r
pcQhandle
;
extern
void
**
tsR
pcQhandle
;
extern
SMgmtIpList
mgmtIpList
;
extern
SMgmtIpList
mgmtPublicIpList
;
extern
char
mgmtIpStr
[
TSDB_MAX_MGMT_IPS
][
20
];
...
...
src/util/inc/tstatus.h
浏览文件 @
24190a8f
...
...
@@ -40,7 +40,7 @@ enum _TSDB_DB_STATUS {
TSDB_DB_STATUS_DROP_FROM_SDB
};
enum
_TSDB_VN_STATUS
{
typedef
enum
_TSDB_VN_STATUS
{
TSDB_VN_STATUS_OFFLINE
,
TSDB_VN_STATUS_CREATING
,
TSDB_VN_STATUS_UNSYNCED
,
...
...
@@ -48,7 +48,7 @@ enum _TSDB_VN_STATUS {
TSDB_VN_STATUS_MASTER
,
TSDB_VN_STATUS_CLOSING
,
TSDB_VN_STATUS_DELETING
,
};
}
EVnodeStatus
;
enum
_TSDB_VN_SYNC_STATUS
{
TSDB_VN_SYNC_STATUS_INIT
,
...
...
src/vnode/detail/inc/vnode.h
浏览文件 @
24190a8f
...
...
@@ -302,9 +302,8 @@ typedef struct {
// internal globals
extern
int
tsMeterSizeOnFile
;
extern
void
**
rpcQhandle
;
extern
void
*
q
ueryQhandle
;
extern
void
*
tsQ
ueryQhandle
;
extern
int
tsVnodePeers
;
extern
int
tsMaxVnode
;
extern
int
tsMaxQueues
;
...
...
src/vnode/detail/src/vnodeRead.c
浏览文件 @
24190a8f
...
...
@@ -696,7 +696,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
dTrace
(
"QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule"
,
pQInfo
,
pQInfo
->
refCount
);
taosScheduleTask
(
q
ueryQhandle
,
&
schedMsg
);
taosScheduleTask
(
tsQ
ueryQhandle
,
&
schedMsg
);
return
pQInfo
;
_error:
...
...
@@ -812,7 +812,7 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
dTrace
(
"QInfo:%p set query flag and prepare runtime environment completed, wait for schedule"
,
pQInfo
);
taosScheduleTask
(
q
ueryQhandle
,
&
schedMsg
);
taosScheduleTask
(
tsQ
ueryQhandle
,
&
schedMsg
);
return
pQInfo
;
_error:
...
...
@@ -912,7 +912,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
schedMsg
.
msg
=
NULL
;
schedMsg
.
thandle
=
(
void
*
)
1
;
schedMsg
.
ahandle
=
pQInfo
;
taosScheduleTask
(
q
ueryQhandle
,
&
schedMsg
);
taosScheduleTask
(
tsQ
ueryQhandle
,
&
schedMsg
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录