Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
47c3361e
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
47c3361e
编写于
1月 19, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode
上级
3f4551fa
d8e6fd62
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
694 addition
and
413 deletion
+694
-413
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+120
-0
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+48
-49
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+75
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+198
-0
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+117
-0
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+119
-352
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+4
-2
source/libs/transport/test/transportTests.cc
source/libs/transport/test/transportTests.cc
+2
-1
tests/script/sh/massiveTable/deployCluster.sh
tests/script/sh/massiveTable/deployCluster.sh
+8
-7
tests/script/sh/massiveTable/setupDnodes.sh
tests/script/sh/massiveTable/setupDnodes.sh
+3
-2
未找到文件。
source/libs/transport/inc/transComm.h
0 → 100644
浏览文件 @
47c3361e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
typedef
void
*
queue
[
2
];
/* Private macros. */
#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
/* Initialize an empty queue. */
#define QUEUE_INIT(q) \
{ \
QUEUE_NEXT(q) = (q); \
QUEUE_PREV(q) = (q); \
}
/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))
/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e) \
{ \
QUEUE_NEXT(e) = (q); \
QUEUE_PREV(e) = QUEUE_PREV(q); \
QUEUE_PREV_NEXT(e) = (e); \
QUEUE_PREV(q) = (e); \
}
/* Remove the given element from the queue. Any element can be removed at any *
* time. */
#define QUEUE_REMOVE(e) \
{ \
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
/* Iterate over the element of a queue. * Mutating the queue while iterating
* results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
typedef
struct
{
SRpcInfo
*
pRpc
;
// associated SRpcInfo
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
struct
SRpcConn
*
pConn
;
// pConn allocated
tmsg_t
msgType
;
// message type
uint8_t
*
pCont
;
// content provided by app
int32_t
contLen
;
// content length
int32_t
code
;
// error code
int16_t
numOfTry
;
// number of try for different servers
int8_t
oldInUse
;
// server EP inUse passed by app
int8_t
redirect
;
// flag to indicate redirect
int8_t
connType
;
// connection type
int64_t
rid
;
// refId returned by taosAddRef
SRpcMsg
*
pRsp
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
SEpSet
*
pSet
;
// for synchronous API
char
msg
[
0
];
// RpcHead starts from here
}
SRpcReqContext
;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
);
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
);
#endif
source/libs/transport/inc/transportInt.h
浏览文件 @
47c3361e
...
@@ -16,62 +16,61 @@
...
@@ -16,62 +16,61 @@
#ifndef _TD_TRANSPORT_INT_H_
#ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_
#ifdef USE_UV
#include <uv.h>
#endif
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
#ifdef USE_UV
#ifdef USE_UV
#include <stddef.h>
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
typedef
void
*
queue
[
2
];
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
/* Private macros. */
typedef
struct
{
#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0]))
int
sessions
;
// number of sessions allowed
#define QUEUE_PREV(q) (*(queue **)&((*(q))[1]))
int
numOfThreads
;
// number of threads to process incoming messages
int
idleTime
;
// milliseconds;
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
uint16_t
localPort
;
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
int8_t
connType
;
int64_t
index
;
/* Initialize an empty queue. */
char
label
[
TSDB_LABEL_LEN
];
#define QUEUE_INIT(q) \
{ \
char
user
[
TSDB_UNI_LEN
];
// meter ID
QUEUE_NEXT(q) = (q); \
char
spi
;
// security parameter index
QUEUE_PREV(q) = (q); \
char
encrypt
;
// encrypt algorithm
}
char
secret
[
TSDB_PASSWORD_LEN
];
// secret for the link
char
ckey
[
TSDB_PASSWORD_LEN
];
// ciphering key
/* Return true if the queue has no element. */
#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q))
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
int
(
*
afp
)(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
/* Insert an element at the back of a queue. */
#define QUEUE_PUSH(q, e) \
int32_t
refCount
;
{ \
void
*
parent
;
QUEUE_NEXT(e) = (q); \
void
*
idPool
;
// handle to ID pool
QUEUE_PREV(e) = QUEUE_PREV(q); \
void
*
tmrCtrl
;
// handle to timer
QUEUE_PREV_NEXT(e) = (e); \
SHashObj
*
hash
;
// handle returned by hash utility
QUEUE_PREV(q) = (e); \
void
*
tcphandle
;
// returned handle from TCP initialization
}
pthread_mutex_t
mutex
;
}
SRpcInfo
;
/* Remove the given element from the queue. Any element can be removed at any *
* time. */
#define QUEUE_REMOVE(e) \
{ \
QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
}
/* Return the element at the front of the queue. */
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
/* Return the element at the back of the queue. */
#define QUEUE_TAIL(q) (QUEUE_PREV(q))
/* Iterate over the element of a queue. * Mutating the queue while iterating
* results in undefined behavior. */
#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field))))
#endif // USE_LIBUV
#endif // USE_LIBUV
...
...
source/libs/transport/src/trans.c
0 → 100644
浏览文件 @
47c3361e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SConnBuffer
{
char
*
buf
;
int
len
;
int
cap
;
int
left
;
}
SConnBuffer
;
void
*
(
*
taosHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
taosInitServer
,
taosInitClient
};
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
SRpcInfo
*
pRpc
=
calloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
{
return
NULL
;
}
if
(
pInit
->
label
)
{
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strlen
(
pInit
->
label
));
}
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
tcphandle
=
(
*
taosHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
return
pRpc
;
}
void
rpcClose
(
void
*
arg
)
{
return
;
}
void
*
rpcMallocCont
(
int
contLen
)
{
int
size
=
contLen
+
RPC_MSG_OVERHEAD
;
char
*
start
=
(
char
*
)
calloc
(
1
,
(
size_t
)
size
);
if
(
start
==
NULL
)
{
tError
(
"failed to malloc msg, size:%d"
,
size
);
return
NULL
;
}
else
{
tTrace
(
"malloc mem:%p size:%d"
,
start
,
size
);
}
return
start
+
sizeof
(
SRpcReqContext
)
+
sizeof
(
SRpcHead
);
}
void
rpcFreeCont
(
void
*
cont
)
{
return
;
}
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
)
{
return
NULL
;
}
void
rpcSendRedirectRsp
(
void
*
pConn
,
const
SEpSet
*
pEpSet
)
{}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
-
1
;
}
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
)
{
return
;
}
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
)
{
return
-
1
;
}
void
rpcCancelRequest
(
int64_t
rid
)
{
return
;
}
int32_t
rpcInit
(
void
)
{
// impl later
return
-
1
;
}
void
rpcCleanup
(
void
)
{
// impl later
return
;
}
#endif
source/libs/transport/src/transCli.c
0 → 100644
浏览文件 @
47c3361e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SCliConn
{
uv_connect_t
connReq
;
uv_stream_t
*
stream
;
void
*
data
;
queue
conn
;
}
SCliConn
;
typedef
struct
SCliMsg
{
SRpcReqContext
*
context
;
queue
q
;
}
SCliMsg
;
typedef
struct
SCliThrdObj
{
pthread_t
thread
;
uv_loop_t
*
loop
;
uv_async_t
*
cliAsync
;
//
void
*
cache
;
// conn pool
queue
msg
;
pthread_mutex_t
msgMtx
;
void
*
shandle
;
}
SCliThrdObj
;
typedef
struct
SClientObj
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SCliThrdObj
**
pThreadObj
;
}
SClientObj
;
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
);
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
clientConnCb
(
struct
uv_connect_s
*
req
,
int
status
);
static
void
clientAsyncCb
(
uv_async_t
*
handle
);
static
void
*
clientThread
(
void
*
arg
);
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
// impl later
}
static
void
clientFailedCb
(
uv_handle_t
*
handle
)
{
// impl later
tDebug
(
"close handle"
);
}
static
void
clientReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
// impl later
}
static
void
clientConnCb
(
struct
uv_connect_s
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
SCliMsg
*
pMsg
=
pConn
->
data
;
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
char
*
fqdn
=
pEpSet
->
fqdn
[
pEpSet
->
inUse
];
uint32_t
port
=
pEpSet
->
port
[
pEpSet
->
inUse
];
if
(
status
!=
0
)
{
// call user fp later
tError
(
"failed to connect server(%s, %d), errmsg: %s"
,
fqdn
,
port
,
uv_strerror
(
status
));
uv_close
((
uv_handle_t
*
)
req
->
handle
,
clientFailedCb
);
return
;
}
assert
(
pConn
->
stream
==
req
->
handle
);
// impl later
}
static
SCliConn
*
getConnFromCache
(
void
*
cache
,
char
*
ip
,
uint32_t
port
)
{
// impl later
return
NULL
;
}
static
void
clientAsyncCb
(
uv_async_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
SCliMsg
*
pMsg
=
NULL
;
pthread_mutex_lock
(
&
pThrd
->
msgMtx
);
if
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
msg
))
{
queue
*
head
=
QUEUE_HEAD
(
&
pThrd
->
msg
);
pMsg
=
QUEUE_DATA
(
head
,
SCliMsg
,
q
);
QUEUE_REMOVE
(
head
);
}
pthread_mutex_unlock
(
&
pThrd
->
msgMtx
);
SEpSet
*
pEpSet
=
&
pMsg
->
context
->
epSet
;
char
*
fqdn
=
pEpSet
->
fqdn
[
pEpSet
->
inUse
];
uint32_t
port
=
pEpSet
->
port
[
pEpSet
->
inUse
];
SCliConn
*
conn
=
getConnFromCache
(
pThrd
->
cache
,
fqdn
,
port
);
if
(
conn
!=
NULL
)
{
// impl later
}
else
{
SCliConn
*
conn
=
malloc
(
sizeof
(
SCliConn
));
conn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)(
conn
->
stream
));
conn
->
connReq
.
data
=
conn
;
conn
->
data
=
pMsg
;
struct
sockaddr_in
addr
;
uv_ip4_addr
(
fqdn
,
port
,
&
addr
);
// handle error in callback if connect error
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
clientConnCb
);
}
// SRpcReqContext* pCxt = pMsg->context;
// SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont);
// char* msg = (char*)pHead;
// int len = rpcMsgLenFromCont(pCtx->contLen);
// tmsg_t msgType = pCtx->msgType;
// impl later
}
static
void
*
clientThread
(
void
*
arg
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
arg
;
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
cli
=
calloc
(
1
,
sizeof
(
SClientObj
));
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SCliThrdObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SCliThrdObj
*
pThrd
=
(
SCliThrdObj
*
)
calloc
(
1
,
sizeof
(
SCliThrdObj
));
QUEUE_INIT
(
&
pThrd
->
msg
);
pthread_mutex_init
(
&
pThrd
->
msgMtx
,
NULL
);
// QUEUE_INIT(&pThrd->clientCache);
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
pThrd
->
cliAsync
=
malloc
(
sizeof
(
uv_async_t
));
uv_async_init
(
pThrd
->
loop
,
pThrd
->
cliAsync
,
clientAsyncCb
);
pThrd
->
cliAsync
->
data
=
pThrd
;
pThrd
->
shandle
=
shandle
;
int
err
=
pthread_create
(
&
pThrd
->
thread
,
NULL
,
clientThread
,
(
void
*
)(
pThrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
}
cli
->
pThreadObj
[
i
]
=
pThrd
;
}
return
cli
;
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
// impl later
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
shandle
;
int
len
=
rpcCompressRpcMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
);
SRpcReqContext
*
pContext
;
pContext
=
(
SRpcReqContext
*
)((
char
*
)
pMsg
->
pCont
-
sizeof
(
SRpcHead
)
-
sizeof
(
SRpcReqContext
));
pContext
->
ahandle
=
pMsg
->
ahandle
;
pContext
->
pRpc
=
(
SRpcInfo
*
)
shandle
;
pContext
->
epSet
=
*
pEpSet
;
pContext
->
contLen
=
len
;
pContext
->
pCont
=
pMsg
->
pCont
;
pContext
->
msgType
=
pMsg
->
msgType
;
pContext
->
oldInUse
=
pEpSet
->
inUse
;
assert
(
pRpc
->
connType
==
TAOS_CONN_CLIENT
);
// atomic or not
int64_t
index
=
pRpc
->
index
;
if
(
pRpc
->
index
++
>=
pRpc
->
numOfThreads
)
{
pRpc
->
index
=
0
;
}
SCliMsg
*
msg
=
malloc
(
sizeof
(
SCliMsg
));
msg
->
context
=
pContext
;
SCliThrdObj
*
thrd
=
((
SClientObj
*
)
pRpc
->
tcphandle
)
->
pThreadObj
[
index
%
pRpc
->
numOfThreads
];
pthread_mutex_lock
(
&
thrd
->
msgMtx
);
QUEUE_PUSH
(
&
thrd
->
msg
,
&
msg
->
q
);
pthread_mutex_unlock
(
&
thrd
->
msgMtx
);
uv_async_send
(
thrd
->
cliAsync
);
}
#endif
source/libs/transport/src/transComm.c
0 → 100644
浏览文件 @
47c3361e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include "transComm.h"
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
int
ret
=
-
1
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Final
(
&
context
);
if
(
memcmp
(
context
.
digest
,
pAuth
,
sizeof
(
context
.
digest
))
==
0
)
ret
=
0
;
return
ret
;
}
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Final
(
&
context
);
memcpy
(
pAuth
,
context
.
digest
,
sizeof
(
context
.
digest
));
}
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
)
{
SRpcHead
*
pHead
=
rpcHeadFromCont
(
pCont
);
int32_t
finalLen
=
0
;
int
overhead
=
sizeof
(
SRpcComp
);
if
(
!
NEEDTO_COMPRESSS_MSG
(
contLen
))
{
return
contLen
;
}
char
*
buf
=
malloc
(
contLen
+
overhead
+
8
);
// 8 extra bytes
if
(
buf
==
NULL
)
{
tError
(
"failed to allocate memory for rpc msg compression, contLen:%d"
,
contLen
);
return
contLen
;
}
int32_t
compLen
=
LZ4_compress_default
(
pCont
,
buf
,
contLen
,
contLen
+
overhead
);
tDebug
(
"compress rpc msg, before:%d, after:%d, overhead:%d"
,
contLen
,
compLen
,
overhead
);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if
(
compLen
>
0
&&
compLen
<
contLen
-
overhead
)
{
SRpcComp
*
pComp
=
(
SRpcComp
*
)
pCont
;
pComp
->
reserved
=
0
;
pComp
->
contLen
=
htonl
(
contLen
);
memcpy
(
pCont
+
overhead
,
buf
,
compLen
);
pHead
->
comp
=
1
;
tDebug
(
"compress rpc msg, before:%d, after:%d"
,
contLen
,
compLen
);
finalLen
=
compLen
+
overhead
;
}
else
{
finalLen
=
contLen
;
}
free
(
buf
);
return
finalLen
;
}
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
)
{
int
overhead
=
sizeof
(
SRpcComp
);
SRpcHead
*
pNewHead
=
NULL
;
uint8_t
*
pCont
=
pHead
->
content
;
SRpcComp
*
pComp
=
(
SRpcComp
*
)
pHead
->
content
;
if
(
pHead
->
comp
)
{
// decompress the content
assert
(
pComp
->
reserved
==
0
);
int
contLen
=
htonl
(
pComp
->
contLen
);
// prepare the temporary buffer to decompress message
char
*
temp
=
(
char
*
)
malloc
(
contLen
+
RPC_MSG_OVERHEAD
);
pNewHead
=
(
SRpcHead
*
)(
temp
+
sizeof
(
SRpcReqContext
));
// reserve SRpcReqContext
if
(
pNewHead
)
{
int
compLen
=
rpcContLenFromMsg
(
pHead
->
msgLen
)
-
overhead
;
int
origLen
=
LZ4_decompress_safe
((
char
*
)(
pCont
+
overhead
),
(
char
*
)
pNewHead
->
content
,
compLen
,
contLen
);
assert
(
origLen
==
contLen
);
memcpy
(
pNewHead
,
pHead
,
sizeof
(
SRpcHead
));
pNewHead
->
msgLen
=
rpcMsgLenFromCont
(
origLen
);
/// rpcFreeMsg(pHead); // free the compressed message buffer
pHead
=
pNewHead
;
tTrace
(
"decomp malloc mem:%p"
,
temp
);
}
else
{
tError
(
"failed to allocate memory to decompress msg, contLen:%d"
,
contLen
);
}
}
return
pHead
;
}
#endif
source/libs/transport/src/trans
port
.c
→
source/libs/transport/src/trans
Srv
.c
浏览文件 @
47c3361e
...
@@ -14,118 +14,7 @@
...
@@ -14,118 +14,7 @@
*/
*/
#ifdef USE_UV
#ifdef USE_UV
#include "transComm.h"
#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
static
const
char
*
notify
=
"a"
;
typedef
struct
{
int
sessions
;
// number of sessions allowed
int
numOfThreads
;
// number of threads to process incoming messages
int
idleTime
;
// milliseconds;
uint16_t
localPort
;
int8_t
connType
;
int
index
;
// for UDP server only, round robin for multiple threads
char
label
[
TSDB_LABEL_LEN
];
char
user
[
TSDB_UNI_LEN
];
// meter ID
char
spi
;
// security parameter index
char
encrypt
;
// encrypt algorithm
char
secret
[
TSDB_PASSWORD_LEN
];
// secret for the link
char
ckey
[
TSDB_PASSWORD_LEN
];
// ciphering key
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
int
(
*
afp
)(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
int32_t
refCount
;
void
*
parent
;
void
*
idPool
;
// handle to ID pool
void
*
tmrCtrl
;
// handle to timer
SHashObj
*
hash
;
// handle returned by hash utility
void
*
tcphandle
;
// returned handle from TCP initialization
void
*
udphandle
;
// returned handle from UDP initialization
void
*
pCache
;
// connection cache
pthread_mutex_t
mutex
;
struct
SRpcConn
*
connList
;
// connection list
}
SRpcInfo
;
typedef
struct
{
SRpcInfo
*
pRpc
;
// associated SRpcInfo
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
struct
SRpcConn
*
pConn
;
// pConn allocated
tmsg_t
msgType
;
// message type
uint8_t
*
pCont
;
// content provided by app
int32_t
contLen
;
// content length
int32_t
code
;
// error code
int16_t
numOfTry
;
// number of try for different servers
int8_t
oldInUse
;
// server EP inUse passed by app
int8_t
redirect
;
// flag to indicate redirect
int8_t
connType
;
// connection type
int64_t
rid
;
// refId returned by taosAddRef
SRpcMsg
*
pRsp
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
SEpSet
*
pSet
;
// for synchronous API
char
msg
[
0
];
// RpcHead starts from here
}
SRpcReqContext
;
typedef
struct
SThreadObj
{
pthread_t
thread
;
uv_pipe_t
*
pipe
;
int
fd
;
uv_loop_t
*
loop
;
uv_async_t
*
workerAsync
;
//
queue
conn
;
pthread_mutex_t
connMtx
;
void
*
shandle
;
}
SThreadObj
;
typedef
struct
SClientObj
{
char
label
[
TSDB_LABEL_LEN
];
int32_t
index
;
int
numOfThreads
;
SThreadObj
**
pThreadObj
;
}
SClientObj
;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
typedef
struct
SServerObj
{
pthread_t
thread
;
uv_tcp_t
server
;
uv_loop_t
*
loop
;
int
workerIdx
;
int
numOfThreads
;
SThreadObj
**
pThreadObj
;
uv_pipe_t
**
pipe
;
uint32_t
ip
;
uint32_t
port
;
}
SServerObj
;
typedef
struct
SConnBuffer
{
typedef
struct
SConnBuffer
{
char
*
buf
;
char
*
buf
;
...
@@ -134,7 +23,7 @@ typedef struct SConnBuffer {
...
@@ -134,7 +23,7 @@ typedef struct SConnBuffer {
int
left
;
int
left
;
}
SConnBuffer
;
}
SConnBuffer
;
typedef
struct
S
Rpc
Conn
{
typedef
struct
SConn
{
uv_tcp_t
*
pTcp
;
uv_tcp_t
*
pTcp
;
uv_write_t
*
pWriter
;
uv_write_t
*
pWriter
;
uv_timer_t
*
pTimer
;
uv_timer_t
*
pTimer
;
...
@@ -148,7 +37,7 @@ typedef struct SRpcConn {
...
@@ -148,7 +37,7 @@ typedef struct SRpcConn {
int
count
;
int
count
;
void
*
shandle
;
// rpc init
void
*
shandle
;
// rpc init
void
*
ahandle
;
//
void
*
ahandle
;
//
void
*
hostThr
ea
d
;
void
*
hostThrd
;
// del later
// del later
char
secured
;
char
secured
;
int
spi
;
int
spi
;
...
@@ -156,16 +45,37 @@ typedef struct SRpcConn {
...
@@ -156,16 +45,37 @@ typedef struct SRpcConn {
char
user
[
TSDB_UNI_LEN
];
// user ID for the link
char
user
[
TSDB_UNI_LEN
];
// user ID for the link
char
secret
[
TSDB_PASSWORD_LEN
];
char
secret
[
TSDB_PASSWORD_LEN
];
char
ckey
[
TSDB_PASSWORD_LEN
];
// ciphering key
char
ckey
[
TSDB_PASSWORD_LEN
];
// ciphering key
}
SRpcConn
;
}
SConn
;
typedef
struct
SWorkThrdObj
{
pthread_t
thread
;
uv_pipe_t
*
pipe
;
int
fd
;
uv_loop_t
*
loop
;
uv_async_t
*
workerAsync
;
//
queue
conn
;
pthread_mutex_t
connMtx
;
void
*
shandle
;
}
SWorkThrdObj
;
// auth function
typedef
struct
SServerObj
{
static
int
uvAuthMsg
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
);
pthread_t
thread
;
static
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
uv_tcp_t
server
;
static
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
);
uv_loop_t
*
loop
;
static
int
rpcAddAuthPart
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
);
int
workerIdx
;
// compress data
int
numOfThreads
;
static
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
);
SWorkThrdObj
**
pThreadObj
;
static
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
);
uv_pipe_t
**
pipe
;
uint32_t
ip
;
uint32_t
port
;
}
SServerObj
;
static
const
char
*
notify
=
"a"
;
// refactor later
static
int
rpcAddAuthPart
(
SConn
*
pConn
,
char
*
msg
,
int
msgLen
);
static
int
uvAuthMsg
(
SConn
*
pConn
,
char
*
msg
,
int
msgLen
);
static
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
uvAllocReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
static
void
uvAllocReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
);
...
@@ -176,79 +86,17 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
...
@@ -176,79 +86,17 @@ static void uvOnAcceptCb(uv_stream_t* stream, int status);
static
void
uvOnConnectionCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
uvOnConnectionCb
(
uv_stream_t
*
q
,
ssize_t
nread
,
const
uv_buf_t
*
buf
);
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
);
static
SRpcConn
*
connCreate
();
// already read complete packet
static
void
connDestroy
(
SRpcConn
*
conn
);
static
bool
readComplete
(
SConnBuffer
*
buf
);
static
SConn
*
connCreate
();
static
void
connDestroy
(
SConn
*
conn
);
static
void
uvConnDestroy
(
uv_handle_t
*
handle
);
static
void
uvConnDestroy
(
uv_handle_t
*
handle
);
// server worke thread
static
void
*
workerThread
(
void
*
arg
);
static
void
*
workerThread
(
void
*
arg
);
static
void
*
acceptThread
(
void
*
arg
);
static
void
*
acceptThread
(
void
*
arg
);
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
(
*
taosHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
taosInitServer
,
taosInitClient
};
void
*
taosInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SClientObj
*
cli
=
calloc
(
1
,
sizeof
(
SClientObj
));
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SThreadObj
*
));
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
SThreadObj
*
thrd
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
int
err
=
pthread_create
(
&
thrd
->
thread
,
NULL
,
workerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create tranport-client thread %d"
,
i
);
}
}
return
cli
;
}
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SServerObj
*
srv
=
calloc
(
1
,
sizeof
(
SServerObj
));
srv
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
srv
->
numOfThreads
=
numOfThreads
;
srv
->
workerIdx
=
0
;
srv
->
pThreadObj
=
(
SThreadObj
**
)
calloc
(
srv
->
numOfThreads
,
sizeof
(
SThreadObj
*
));
srv
->
pipe
=
(
uv_pipe_t
**
)
calloc
(
srv
->
numOfThreads
,
sizeof
(
uv_pipe_t
*
));
srv
->
ip
=
ip
;
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
SThreadObj
*
thrd
=
(
SThreadObj
*
)
calloc
(
1
,
sizeof
(
SThreadObj
));
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
calloc
(
2
,
sizeof
(
uv_pipe_t
));
int
fds
[
2
];
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
return
NULL
;
}
uv_pipe_init
(
srv
->
loop
,
&
(
srv
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
srv
->
pipe
[
i
][
0
]),
fds
[
1
]);
// init write
thrd
->
shandle
=
shandle
;
thrd
->
fd
=
fds
[
0
];
thrd
->
pipe
=
&
(
srv
->
pipe
[
i
][
1
]);
// init read
int
err
=
pthread_create
(
&
(
thrd
->
thread
),
NULL
,
workerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create worker-thread %d"
,
i
);
// printf("thread %d create\n", i);
}
else
{
// TODO: clear all other resource later
tError
(
"failed to create worker-thread %d"
,
i
);
}
srv
->
pThreadObj
[
i
]
=
thrd
;
}
int
err
=
pthread_create
(
&
srv
->
thread
,
NULL
,
acceptThread
,
(
void
*
)
srv
);
if
(
err
==
0
)
{
tDebug
(
"success to create accept-thread"
);
}
else
{
// clear all resource later
}
return
srv
;
}
void
uvAllocReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
void
uvAllocReadBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
/*
/*
* formate of data buffer:
* formate of data buffer:
...
@@ -256,8 +104,8 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
...
@@ -256,8 +104,8 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
*/
*/
static
const
int
CAPACITY
=
1024
;
static
const
int
CAPACITY
=
1024
;
S
RpcConn
*
ctx
=
handle
->
data
;
S
Conn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
c
tx
->
connBuf
;
SConnBuffer
*
pBuf
=
&
c
onn
->
connBuf
;
if
(
pBuf
->
cap
==
0
)
{
if
(
pBuf
->
cap
==
0
)
{
pBuf
->
buf
=
(
char
*
)
calloc
(
CAPACITY
+
RPC_RESERVE_SIZE
,
sizeof
(
char
));
pBuf
->
buf
=
(
char
*
)
calloc
(
CAPACITY
+
RPC_RESERVE_SIZE
,
sizeof
(
char
));
pBuf
->
len
=
0
;
pBuf
->
len
=
0
;
...
@@ -280,9 +128,10 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
...
@@ -280,9 +128,10 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
buf
->
len
=
pBuf
->
cap
-
pBuf
->
len
;
buf
->
len
=
pBuf
->
cap
-
pBuf
->
len
;
}
}
}
}
// check data read from socket completely or not
// check data read from socket completely or not
//
//
static
bool
isReadAll
(
SConnBuffer
*
data
)
{
static
bool
readComplete
(
SConnBuffer
*
data
)
{
// TODO(yihao): handle pipeline later
// TODO(yihao): handle pipeline later
SRpcHead
rpcHead
;
SRpcHead
rpcHead
;
int32_t
headLen
=
sizeof
(
rpcHead
);
int32_t
headLen
=
sizeof
(
rpcHead
);
...
@@ -299,10 +148,11 @@ static bool isReadAll(SConnBuffer* data) {
...
@@ -299,10 +148,11 @@ static bool isReadAll(SConnBuffer* data) {
return
false
;
return
false
;
}
}
}
}
static
void
uvDoProcess
(
SRecvInfo
*
pRecv
)
{
static
void
uvDoProcess
(
SRecvInfo
*
pRecv
)
{
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
pRecv
->
msg
;
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
pRecv
->
shandle
;
SRpcInfo
*
pRpc
=
(
SRpcInfo
*
)
pRecv
->
shandle
;
S
RpcConn
*
pConn
=
pRecv
->
thandle
;
S
Conn
*
pConn
=
pRecv
->
thandle
;
tDump
(
pRecv
->
msg
,
pRecv
->
msgLen
);
tDump
(
pRecv
->
msg
,
pRecv
->
msgLen
);
...
@@ -311,7 +161,8 @@ static void uvDoProcess(SRecvInfo* pRecv) {
...
@@ -311,7 +161,8 @@ static void uvDoProcess(SRecvInfo* pRecv) {
// do auth and check
// do auth and check
}
}
static
int
uvAuthMsg
(
SRpcConn
*
pConn
,
char
*
msg
,
int
len
)
{
static
int
uvAuthMsg
(
SConn
*
pConn
,
char
*
msg
,
int
len
)
{
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
int
code
=
0
;
int
code
=
0
;
...
@@ -325,7 +176,8 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
...
@@ -325,7 +176,8 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
if
(
!
rpcIsReq
(
pHead
->
msgType
))
{
if
(
!
rpcIsReq
(
pHead
->
msgType
))
{
// for response, if code is auth failure, it shall bypass the auth process
// for response, if code is auth failure, it shall bypass the auth process
code
=
htonl
(
pHead
->
code
);
code
=
htonl
(
pHead
->
code
);
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_INVALID_VERSION
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
if
(
code
==
TSDB_CODE_RPC_INVALID_TIME_STAMP
||
code
==
TSDB_CODE_RPC_AUTH_FAILURE
||
code
==
TSDB_CODE_RPC_INVALID_VERSION
||
code
==
TSDB_CODE_RPC_AUTH_REQUIRED
||
code
==
TSDB_CODE_MND_USER_NOT_EXIST
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
code
==
TSDB_CODE_MND_USER_NOT_EXIST
||
code
==
TSDB_CODE_RPC_NOT_READY
)
{
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
pHead
->
msgLen
);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
...
@@ -361,12 +213,14 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
...
@@ -361,12 +213,14 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
return
code
;
return
code
;
}
}
// refers specifically to query or insert timeout
// refers specifically to query or insert timeout
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
static
void
uvHandleActivityTimeout
(
uv_timer_t
*
handle
)
{
// impl later
// impl later
S
Rpc
Conn
*
conn
=
handle
->
data
;
SConn
*
conn
=
handle
->
data
;
}
}
static
void
uvProcessData
(
SRpcConn
*
pConn
)
{
static
void
uvProcessData
(
SConn
*
pConn
)
{
SRecvInfo
info
;
SRecvInfo
info
;
SRecvInfo
*
p
=
&
info
;
SRecvInfo
*
p
=
&
info
;
SConnBuffer
*
pBuf
=
&
pConn
->
connBuf
;
SConnBuffer
*
pBuf
=
&
pConn
->
connBuf
;
...
@@ -408,13 +262,14 @@ static void uvProcessData(SRpcConn* pConn) {
...
@@ -408,13 +262,14 @@ static void uvProcessData(SRpcConn* pConn) {
// auth
// auth
// validate msg type
// validate msg type
}
}
void
uvOnReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
void
uvOnReadCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
// opt
// opt
S
RpcConn
*
ctx
=
cli
->
data
;
S
Conn
*
ctx
=
cli
->
data
;
SConnBuffer
*
pBuf
=
&
ctx
->
connBuf
;
SConnBuffer
*
pBuf
=
&
ctx
->
connBuf
;
if
(
nread
>
0
)
{
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
pBuf
->
len
+=
nread
;
if
(
isReadAll
(
pBuf
))
{
if
(
readComplete
(
pBuf
))
{
tDebug
(
"alread read complete packet"
);
tDebug
(
"alread read complete packet"
);
uvProcessData
(
ctx
);
uvProcessData
(
ctx
);
}
else
{
}
else
{
...
@@ -442,7 +297,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
...
@@ -442,7 +297,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
}
}
void
uvOnWriteCb
(
uv_write_t
*
req
,
int
status
)
{
void
uvOnWriteCb
(
uv_write_t
*
req
,
int
status
)
{
S
Rpc
Conn
*
conn
=
req
->
data
;
SConn
*
conn
=
req
->
data
;
if
(
status
==
0
)
{
if
(
status
==
0
)
{
tDebug
(
"data already was written on stream"
);
tDebug
(
"data already was written on stream"
);
}
else
{
}
else
{
...
@@ -452,15 +307,15 @@ void uvOnWriteCb(uv_write_t* req, int status) {
...
@@ -452,15 +307,15 @@ void uvOnWriteCb(uv_write_t* req, int status) {
}
}
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
)
{
void
uvWorkerAsyncCb
(
uv_async_t
*
handle
)
{
S
ThreadObj
*
pThrd
=
container_of
(
handle
,
SThrea
dObj
,
workerAsync
);
S
WorkThrdObj
*
pThrd
=
container_of
(
handle
,
SWorkThr
dObj
,
workerAsync
);
S
RpcConn
*
conn
=
NULL
;
S
Conn
*
conn
=
NULL
;
// opt later
// opt later
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
if
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
conn
))
{
if
(
!
QUEUE_IS_EMPTY
(
&
pThrd
->
conn
))
{
queue
*
head
=
QUEUE_HEAD
(
&
pThrd
->
conn
);
queue
*
head
=
QUEUE_HEAD
(
&
pThrd
->
conn
);
conn
=
QUEUE_DATA
(
head
,
S
Rpc
Conn
,
queue
);
conn
=
QUEUE_DATA
(
head
,
SConn
,
queue
);
QUEUE_REMOVE
(
&
conn
->
queue
);
QUEUE_REMOVE
(
head
);
}
}
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
if
(
conn
==
NULL
)
{
if
(
conn
==
NULL
)
{
...
@@ -507,7 +362,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...
@@ -507,7 +362,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert
(
buf
->
base
[
0
]
==
notify
[
0
]);
assert
(
buf
->
base
[
0
]
==
notify
[
0
]);
free
(
buf
->
base
);
free
(
buf
->
base
);
S
Threa
dObj
*
pThrd
=
q
->
data
;
S
WorkThr
dObj
*
pThrd
=
q
->
data
;
uv_pipe_t
*
pipe
=
(
uv_pipe_t
*
)
q
;
uv_pipe_t
*
pipe
=
(
uv_pipe_t
*
)
q
;
if
(
!
uv_pipe_pending_count
(
pipe
))
{
if
(
!
uv_pipe_pending_count
(
pipe
))
{
...
@@ -518,14 +373,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...
@@ -518,14 +373,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_handle_type
pending
=
uv_pipe_pending_type
(
pipe
);
uv_handle_type
pending
=
uv_pipe_pending_type
(
pipe
);
assert
(
pending
==
UV_TCP
);
assert
(
pending
==
UV_TCP
);
S
Rpc
Conn
*
pConn
=
connCreate
();
SConn
*
pConn
=
connCreate
();
pConn
->
shandle
=
pThrd
->
shandle
;
pConn
->
shandle
=
pThrd
->
shandle
;
/* init conn timer*/
/* init conn timer*/
pConn
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
pConn
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
uv_timer_init
(
pThrd
->
loop
,
pConn
->
pTimer
);
uv_timer_init
(
pThrd
->
loop
,
pConn
->
pTimer
);
pConn
->
pTimer
->
data
=
pConn
;
pConn
->
pTimer
->
data
=
pConn
;
pConn
->
hostThr
ea
d
=
pThrd
;
pConn
->
hostThrd
=
pThrd
;
pConn
->
pWorkerAsync
=
pThrd
->
workerAsync
;
// thread safty
pConn
->
pWorkerAsync
=
pThrd
->
workerAsync
;
// thread safty
// init client handle
// init client handle
...
@@ -564,17 +419,19 @@ void* acceptThread(void* arg) {
...
@@ -564,17 +419,19 @@ void* acceptThread(void* arg) {
uv_run
(
srv
->
loop
,
UV_RUN_DEFAULT
);
uv_run
(
srv
->
loop
,
UV_RUN_DEFAULT
);
}
}
void
*
workerThread
(
void
*
arg
)
{
void
*
workerThread
(
void
*
arg
)
{
S
ThreadObj
*
pThrd
=
(
SThrea
dObj
*
)
arg
;
S
WorkThrdObj
*
pThrd
=
(
SWorkThr
dObj
*
)
arg
;
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
pThrd
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
uv_loop_init
(
pThrd
->
loop
);
uv_loop_init
(
pThrd
->
loop
);
// SRpcInfo* pRpc = pThrd->shandle;
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
uv_pipe_open
(
pThrd
->
pipe
,
pThrd
->
fd
);
uv_pipe_open
(
pThrd
->
pipe
,
pThrd
->
fd
);
pThrd
->
pipe
->
data
=
pThrd
;
pThrd
->
pipe
->
data
=
pThrd
;
QUEUE_INIT
(
&
pThrd
->
conn
);
QUEUE_INIT
(
&
pThrd
->
conn
);
pthread_mutex_init
(
&
pThrd
->
connMtx
,
NULL
);
pThrd
->
workerAsync
=
malloc
(
sizeof
(
uv_async_t
));
pThrd
->
workerAsync
=
malloc
(
sizeof
(
uv_async_t
));
uv_async_init
(
pThrd
->
loop
,
pThrd
->
workerAsync
,
uvWorkerAsyncCb
);
uv_async_init
(
pThrd
->
loop
,
pThrd
->
workerAsync
,
uvWorkerAsyncCb
);
...
@@ -582,11 +439,12 @@ void* workerThread(void* arg) {
...
@@ -582,11 +439,12 @@ void* workerThread(void* arg) {
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
uv_read_start
((
uv_stream_t
*
)
pThrd
->
pipe
,
uvAllocConnBufferCb
,
uvOnConnectionCb
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
}
static
SRpcConn
*
connCreate
()
{
SRpcConn
*
pConn
=
(
SRpcConn
*
)
calloc
(
1
,
sizeof
(
SRpcConn
));
static
SConn
*
connCreate
()
{
SConn
*
pConn
=
(
SConn
*
)
calloc
(
1
,
sizeof
(
SConn
));
return
pConn
;
return
pConn
;
}
}
static
void
connDestroy
(
S
Rpc
Conn
*
conn
)
{
static
void
connDestroy
(
SConn
*
conn
)
{
if
(
conn
==
NULL
)
{
if
(
conn
==
NULL
)
{
return
;
return
;
}
}
...
@@ -600,78 +458,10 @@ static void connDestroy(SRpcConn* conn) {
...
@@ -600,78 +458,10 @@ static void connDestroy(SRpcConn* conn) {
// handle
// handle
}
}
static
void
uvConnDestroy
(
uv_handle_t
*
handle
)
{
static
void
uvConnDestroy
(
uv_handle_t
*
handle
)
{
S
Rpc
Conn
*
conn
=
handle
->
data
;
SConn
*
conn
=
handle
->
data
;
connDestroy
(
conn
);
connDestroy
(
conn
);
}
}
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
static
int
rpcAddAuthPart
(
SConn
*
pConn
,
char
*
msg
,
int
msgLen
)
{
SRpcInfo
*
pRpc
=
calloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
{
return
NULL
;
}
if
(
pInit
->
label
)
{
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strlen
(
pInit
->
label
));
}
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
tcphandle
=
(
*
taosHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
// pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return
pRpc
;
}
void
rpcClose
(
void
*
arg
)
{
return
;
}
void
*
rpcMallocCont
(
int
contLen
)
{
return
NULL
;
}
void
rpcFreeCont
(
void
*
cont
)
{
return
;
}
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
)
{
return
NULL
;
}
void
rpcSendRequest
(
void
*
thandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
rid
)
{
// impl later
return
;
}
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
SRpcConn
*
pConn
=
pMsg
->
handle
;
SThreadObj
*
pThrd
=
pConn
->
hostThread
;
// opt later
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
uv_async_send
(
pConn
->
pWorkerAsync
);
}
void
rpcSendRedirectRsp
(
void
*
pConn
,
const
SEpSet
*
pEpSet
)
{}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
-
1
;
}
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
)
{
return
;
}
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
)
{
return
-
1
;
}
void
rpcCancelRequest
(
int64_t
rid
)
{
return
;
}
static
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
int
ret
=
-
1
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Final
(
&
context
);
if
(
memcmp
(
context
.
digest
,
pAuth
,
sizeof
(
context
.
digest
))
==
0
)
ret
=
0
;
return
ret
;
}
static
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
T_MD5_CTX
context
;
tMD5Init
(
&
context
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
tMD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_PASSWORD_LEN
);
tMD5Final
(
&
context
);
memcpy
(
pAuth
,
context
.
digest
,
sizeof
(
context
.
digest
));
}
static
int
rpcAddAuthPart
(
SRpcConn
*
pConn
,
char
*
msg
,
int
msgLen
)
{
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
SRpcHead
*
pHead
=
(
SRpcHead
*
)
msg
;
if
(
pConn
->
spi
&&
pConn
->
secured
==
0
)
{
if
(
pConn
->
spi
&&
pConn
->
secured
==
0
)
{
...
@@ -690,84 +480,61 @@ static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) {
...
@@ -690,84 +480,61 @@ static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) {
return
msgLen
;
return
msgLen
;
}
}
static
int32_t
rpcCompressRpcMsg
(
char
*
pCont
,
int32_t
contLen
)
{
void
*
taosInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SRpcHead
*
pHead
=
rpcHeadFromCont
(
pCont
);
SServerObj
*
srv
=
calloc
(
1
,
sizeof
(
SServerObj
));
int32_t
finalLen
=
0
;
srv
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
int
overhead
=
sizeof
(
SRpcComp
);
srv
->
numOfThreads
=
numOfThreads
;
srv
->
workerIdx
=
0
;
srv
->
pThreadObj
=
(
SWorkThrdObj
**
)
calloc
(
srv
->
numOfThreads
,
sizeof
(
SWorkThrdObj
*
));
srv
->
pipe
=
(
uv_pipe_t
**
)
calloc
(
srv
->
numOfThreads
,
sizeof
(
uv_pipe_t
*
));
srv
->
ip
=
ip
;
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
if
(
!
NEEDTO_COMPRESSS_MSG
(
contLen
))
{
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
return
contLen
;
SWorkThrdObj
*
thrd
=
(
SWorkThrdObj
*
)
calloc
(
1
,
sizeof
(
SWorkThrdObj
));
srv
->
pipe
[
i
]
=
(
uv_pipe_t
*
)
calloc
(
2
,
sizeof
(
uv_pipe_t
));
int
fds
[
2
];
if
(
uv_socketpair
(
AF_UNIX
,
SOCK_STREAM
,
fds
,
UV_NONBLOCK_PIPE
,
UV_NONBLOCK_PIPE
)
!=
0
)
{
return
NULL
;
}
}
uv_pipe_init
(
srv
->
loop
,
&
(
srv
->
pipe
[
i
][
0
]),
1
);
uv_pipe_open
(
&
(
srv
->
pipe
[
i
][
0
]),
fds
[
1
]);
// init write
char
*
buf
=
malloc
(
contLen
+
overhead
+
8
);
// 8 extra bytes
thrd
->
shandle
=
shandle
;
if
(
buf
==
NULL
)
{
thrd
->
fd
=
fds
[
0
];
tError
(
"failed to allocate memory for rpc msg compression, contLen:%d"
,
contLen
);
thrd
->
pipe
=
&
(
srv
->
pipe
[
i
][
1
]);
// init read
return
contLen
;
int
err
=
pthread_create
(
&
(
thrd
->
thread
),
NULL
,
workerThread
,
(
void
*
)(
thrd
));
if
(
err
==
0
)
{
tDebug
(
"sucess to create worker-thread %d"
,
i
);
// printf("thread %d create\n", i);
}
else
{
// TODO: clear all other resource later
tError
(
"failed to create worker-thread %d"
,
i
);
}
srv
->
pThreadObj
[
i
]
=
thrd
;
}
}
int32_t
compLen
=
LZ4_compress_default
(
pCont
,
buf
,
contLen
,
contLen
+
overhead
);
int
err
=
pthread_create
(
&
srv
->
thread
,
NULL
,
acceptThread
,
(
void
*
)
srv
);
tDebug
(
"compress rpc msg, before:%d, after:%d, overhead:%d"
,
contLen
,
compLen
,
overhead
);
if
(
err
==
0
)
{
tDebug
(
"success to create accept-thread"
);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if
(
compLen
>
0
&&
compLen
<
contLen
-
overhead
)
{
SRpcComp
*
pComp
=
(
SRpcComp
*
)
pCont
;
pComp
->
reserved
=
0
;
pComp
->
contLen
=
htonl
(
contLen
);
memcpy
(
pCont
+
overhead
,
buf
,
compLen
);
pHead
->
comp
=
1
;
tDebug
(
"compress rpc msg, before:%d, after:%d"
,
contLen
,
compLen
);
finalLen
=
compLen
+
overhead
;
}
else
{
}
else
{
finalLen
=
contLen
;
// clear all resource later
}
}
free
(
buf
);
return
srv
;
return
finalLen
;
}
}
static
SRpcHead
*
rpcDecompressRpcMsg
(
SRpcHead
*
pHead
)
{
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
int
overhead
=
sizeof
(
SRpcComp
);
SConn
*
pConn
=
pMsg
->
handle
;
SRpcHead
*
pNewHead
=
NULL
;
SWorkThrdObj
*
pThrd
=
pConn
->
hostThrd
;
uint8_t
*
pCont
=
pHead
->
content
;
SRpcComp
*
pComp
=
(
SRpcComp
*
)
pHead
->
content
;
if
(
pHead
->
comp
)
{
// decompress the content
assert
(
pComp
->
reserved
==
0
);
int
contLen
=
htonl
(
pComp
->
contLen
);
// prepare the temporary buffer to decompress message
char
*
temp
=
(
char
*
)
malloc
(
contLen
+
RPC_MSG_OVERHEAD
);
pNewHead
=
(
SRpcHead
*
)(
temp
+
sizeof
(
SRpcReqContext
));
// reserve SRpcReqContext
if
(
pNewHead
)
{
int
compLen
=
rpcContLenFromMsg
(
pHead
->
msgLen
)
-
overhead
;
int
origLen
=
LZ4_decompress_safe
((
char
*
)(
pCont
+
overhead
),
(
char
*
)
pNewHead
->
content
,
compLen
,
contLen
);
assert
(
origLen
==
contLen
);
memcpy
(
pNewHead
,
pHead
,
sizeof
(
SRpcHead
));
pNewHead
->
msgLen
=
rpcMsgLenFromCont
(
origLen
);
/// rpcFreeMsg(pHead); // free the compressed message buffer
pHead
=
pNewHead
;
tTrace
(
"decomp malloc mem:%p"
,
temp
);
}
else
{
tError
(
"failed to allocate memory to decompress msg, contLen:%d"
,
contLen
);
}
}
return
pHead
;
// opt later
}
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
int32_t
rpcInit
(
void
)
{
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
// impl later
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
return
-
1
;
}
void
rpcCleanup
(
void
)
{
uv_async_send
(
pConn
->
pWorkerAsync
);
// impl later
return
;
}
}
#endif
#endif
source/libs/transport/test/rclient.c
浏览文件 @
47c3361e
...
@@ -34,7 +34,8 @@ typedef struct {
...
@@ -34,7 +34,8 @@ typedef struct {
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
static
void
processResponse
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
ahandle
;
SInfo
*
pInfo
=
(
SInfo
*
)
pMsg
->
ahandle
;
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
tDebug
(
"thread:%d, response is received, type:%d contLen:%d code:0x%x"
,
pInfo
->
index
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pMsg
->
code
);
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
if
(
pEpSet
)
pInfo
->
epSet
=
*
pEpSet
;
...
@@ -185,7 +186,8 @@ int main(int argc, char *argv[]) {
...
@@ -185,7 +186,8 @@ int main(int argc, char *argv[]) {
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds
// float usedTime = (endTime - startTime) / 1000.0f; // mseconds
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
// tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize);
// tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime,
// msgSize);
int
ch
=
getchar
();
int
ch
=
getchar
();
UNUSED
(
ch
);
UNUSED
(
ch
);
...
...
source/libs/transport/test/transportTests.cc
浏览文件 @
47c3361e
...
@@ -22,6 +22,7 @@
...
@@ -22,6 +22,7 @@
#include <thread>
#include <thread>
#include <vector>
#include <vector>
#include "transComm.h"
#include "transportInt.h"
#include "transportInt.h"
#include "trpc.h"
#include "trpc.h"
...
@@ -46,7 +47,7 @@ class QueueObj {
...
@@ -46,7 +47,7 @@ class QueueObj {
if
(
!
IsEmpty
())
{
if
(
!
IsEmpty
())
{
queue
*
h
=
QUEUE_HEAD
(
&
head
);
queue
*
h
=
QUEUE_HEAD
(
&
head
);
el
=
QUEUE_DATA
(
h
,
QueueElem
,
q
);
el
=
QUEUE_DATA
(
h
,
QueueElem
,
q
);
QUEUE_REMOVE
(
&
el
->
q
);
QUEUE_REMOVE
(
h
);
}
}
return
el
;
return
el
;
}
}
...
...
tests/script/sh/massiveTable/deployCluster.sh
浏览文件 @
47c3361e
...
@@ -8,17 +8,18 @@ set -e
...
@@ -8,17 +8,18 @@ set -e
# deployCluster.sh
# deployCluster.sh
curr_dir
=
$(
pwd
)
curr_dir
=
$(
pwd
)
echo
"currect pwd:
${
curr_dir
}
"
source
./cleanCluster.sh
-r
/data
./cleanCluster.sh
-r
"/data"
source
./cleanCluster.sh
-r
/data2
./cleanCluster.sh
-r
"/data2"
source
./compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
./compileVersion.sh
-r
${
curr_dir
}
/../../../../
-v
"3.0"
source
./setupDnodes.sh
-r
/data
-n
1
-f
trd02:7000
-p
7000
./setupDnodes.sh
-r
"/data"
-n
1
-f
"trd02:7000"
-p
7000
source
./setupDnodes.sh
-r
/data2
-n
1
-f
trd02:7000
-p
8000
./setupDnodes.sh
-r
"/data2"
-n
1
-f
"trd02:7000"
-p
8000
#
source ./setupDnodes.sh -r /data
-n 2 -f trd02:7000 -p 7000
#
./setupDnodes.sh -r "/data"
-n 2 -f trd02:7000 -p 7000
#
source ./setupDnodes.sh -r /data2
-n 2 -f trd02:7000 -p 8000
#
./setupDnodes.sh -r "/data2"
-n 2 -f trd02:7000 -p 8000
...
...
tests/script/sh/massiveTable/setupDnodes.sh
浏览文件 @
47c3361e
...
@@ -94,7 +94,7 @@ createNewDnodesDataDir() {
...
@@ -94,7 +94,7 @@ createNewDnodesDataDir() {
mkdir
-p
${
dataRootDir
}
/dnode_
${
i
}
/data
mkdir
-p
${
dataRootDir
}
/dnode_
${
i
}
/data
createNewCfgFile
${
dataRootDir
}
/dnode_
${
i
}
/cfg
${
dataRootDir
}
/dnode_
${
i
}
/data
${
dataRootDir
}
/dnode_
${
i
}
/log
${
firstEp
}
${
serverPort
}
createNewCfgFile
${
dataRootDir
}
/dnode_
${
i
}
/cfg
${
dataRootDir
}
/dnode_
${
i
}
/data
${
dataRootDir
}
/dnode_
${
i
}
/log
${
firstEp
}
${
serverPort
}
echo
"create dnode:
${
serverPort
}
,
${
dataRootDir
}
/dnode_
${
i
}
"
#
echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}"
serverPort
=
$((
10
#${serverPort}+100))
serverPort
=
$((
10
#${serverPort}+100))
done
done
}
}
...
@@ -131,6 +131,7 @@ fi
...
@@ -131,6 +131,7 @@ fi
## start all dnode by nohup
## start all dnode by nohup
startDnodes
${
dnodeNumber
}
startDnodes
${
dnodeNumber
}
echo
" run setupDnodes.sh end !!!"
echo
"====run setupDnodes.sh end===="
echo
" "
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录