Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2271dee6
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2271dee6
编写于
2月 23, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor trans code
上级
3662e6b0
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
169 addition
and
177 deletion
+169
-177
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+3
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+93
-96
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+59
-51
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+3
-2
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+1
-0
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+5
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+5
-10
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+0
-18
未找到文件。
include/libs/transport/trpc.h
浏览文件 @
2271dee6
...
...
@@ -78,6 +78,9 @@ typedef struct SRpcInit {
// call back to retrieve the client auth info, for server app only
int
(
*
afp
)(
void
*
parent
,
char
*
tableId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
// call back to keep conn or not
bool
(
*
pfp
)(
void
*
parent
,
tmsg_t
msgType
);
void
*
parent
;
}
SRpcInit
;
...
...
source/client/inc/clientInt.h
浏览文件 @
2271dee6
...
...
@@ -20,17 +20,17 @@
extern
"C"
{
#endif
#include "taos.h"
#include "common.h"
#include "tmsg.h"
#include "parser.h"
#include "query.h"
#include "taos.h"
#include "tdef.h"
#include "tep.h"
#include "thash.h"
#include "tlist.h"
#include "tmsg.h"
#include "tmsgtype.h"
#include "trpc.h"
#include "query.h"
#include "parser.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
...
...
@@ -46,12 +46,12 @@ extern "C" {
typedef
struct
SAppInstInfo
SAppInstInfo
;
typedef
struct
SHbConnInfo
{
void
*
param
;
SClientHbReq
*
req
;
void
*
param
;
SClientHbReq
*
req
;
}
SHbConnInfo
;
typedef
struct
SAppHbMgr
{
char
*
key
;
char
*
key
;
// statistics
int32_t
reportCnt
;
int32_t
connKeyCnt
;
...
...
@@ -62,15 +62,13 @@ typedef struct SAppHbMgr {
// connection
SAppInstInfo
*
pAppInstInfo
;
// info
SHashObj
*
activeInfo
;
// hash<SClientHbKey, SClientHbReq>
SHashObj
*
connInfo
;
// hash<SClientHbKey, SHbConnInfo>
SHashObj
*
activeInfo
;
// hash<SClientHbKey, SClientHbReq>
SHashObj
*
connInfo
;
// hash<SClientHbKey, SHbConnInfo>
}
SAppHbMgr
;
typedef
int32_t
(
*
FHbRspHandle
)(
struct
SAppHbMgr
*
pAppHbMgr
,
SClientHbRsp
*
pRsp
);
typedef
int32_t
(
*
FHbRspHandle
)(
struct
SAppHbMgr
*
pAppHbMgr
,
SClientHbRsp
*
pRsp
);
typedef
int32_t
(
*
FHbReqHandle
)(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
);
typedef
int32_t
(
*
FHbReqHandle
)(
SClientHbKey
*
connKey
,
void
*
param
,
SClientHbReq
*
req
);
typedef
struct
SClientHbMgr
{
int8_t
inited
;
...
...
@@ -83,63 +81,62 @@ typedef struct SClientHbMgr {
FHbRspHandle
rspHandle
[
HEARTBEAT_TYPE_MAX
];
}
SClientHbMgr
;
typedef
struct
SQueryExecMetric
{
int64_t
start
;
// start timestamp
int64_t
parsed
;
// start to parse
int64_t
send
;
// start to send to server
int64_t
rsp
;
// receive response from server
int64_t
start
;
// start timestamp
int64_t
parsed
;
// start to parse
int64_t
send
;
// start to send to server
int64_t
rsp
;
// receive response from server
}
SQueryExecMetric
;
typedef
struct
SInstanceSummary
{
uint64_t
numOfInsertsReq
;
uint64_t
numOfInsertRows
;
uint64_t
insertElapsedTime
;
uint64_t
insertBytes
;
// submit to tsdb since launched.
uint64_t
fetchBytes
;
uint64_t
queryElapsedTime
;
uint64_t
numOfSlowQueries
;
uint64_t
totalRequests
;
uint64_t
currentRequests
;
// the number of SRequestObj
uint64_t
numOfInsertsReq
;
uint64_t
numOfInsertRows
;
uint64_t
insertElapsedTime
;
uint64_t
insertBytes
;
// submit to tsdb since launched.
uint64_t
fetchBytes
;
uint64_t
queryElapsedTime
;
uint64_t
numOfSlowQueries
;
uint64_t
totalRequests
;
uint64_t
currentRequests
;
// the number of SRequestObj
}
SInstanceSummary
;
typedef
struct
SHeartBeatInfo
{
void
*
pTimer
;
// timer, used to send request msg to mnode
void
*
pTimer
;
// timer, used to send request msg to mnode
}
SHeartBeatInfo
;
struct
SAppInstInfo
{
int64_t
numOfConns
;
SCorEpSet
mgmtEp
;
SInstanceSummary
summary
;
SList
*
pConnList
;
// STscObj linked list
int64_t
clusterId
;
void
*
pTransporter
;
struct
SAppHbMgr
*
pAppHbMgr
;
int64_t
numOfConns
;
SCorEpSet
mgmtEp
;
SInstanceSummary
summary
;
SList
*
pConnList
;
// STscObj linked list
int64_t
clusterId
;
void
*
pTransporter
;
struct
SAppHbMgr
*
pAppHbMgr
;
};
typedef
struct
SAppInfo
{
int64_t
startTime
;
char
appName
[
TSDB_APP_NAME_LEN
];
char
*
ep
;
char
*
ep
;
int32_t
pid
;
int32_t
numOfThreads
;
SHashObj
*
pInstMap
;
SHashObj
*
pInstMap
;
pthread_mutex_t
mutex
;
}
SAppInfo
;
typedef
struct
STscObj
{
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
ver
[
128
];
int32_t
acctId
;
uint32_t
connId
;
int32_t
connType
;
uint64_t
id
;
// ref ID returned by taosAddRef
pthread_mutex_t
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj bound to this connection
SAppInstInfo
*
pAppInfo
;
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
ver
[
128
];
int32_t
acctId
;
uint32_t
connId
;
int32_t
connType
;
uint64_t
id
;
// ref ID returned by taosAddRef
pthread_mutex_t
mutex
;
// used to protect the operation on db
int32_t
numOfReqs
;
// number of sqlObj bound to this connection
SAppInstInfo
*
pAppInfo
;
}
STscObj
;
typedef
struct
SMqConsumer
{
...
...
@@ -147,49 +144,49 @@ typedef struct SMqConsumer {
}
SMqConsumer
;
typedef
struct
SReqResultInfo
{
const
char
*
pRspMsg
;
const
char
*
pData
;
TAOS_FIELD
*
fields
;
uint32_t
numOfCols
;
int32_t
*
length
;
TAOS_ROW
row
;
char
**
pCol
;
uint32_t
numOfRows
;
uint64_t
totalRows
;
uint32_t
current
;
bool
completed
;
const
char
*
pRspMsg
;
const
char
*
pData
;
TAOS_FIELD
*
fields
;
uint32_t
numOfCols
;
int32_t
*
length
;
TAOS_ROW
row
;
char
**
pCol
;
uint32_t
numOfRows
;
uint64_t
totalRows
;
uint32_t
current
;
bool
completed
;
}
SReqResultInfo
;
typedef
struct
SShowReqInfo
{
int64_t
execId
;
// showId/queryId
int32_t
vgId
;
SArray
*
pArray
;
// SArray<SVgroupInfo>
int32_t
currentIndex
;
// current accessed vgroup index.
int64_t
execId
;
// showId/queryId
int32_t
vgId
;
SArray
*
pArray
;
// SArray<SVgroupInfo>
int32_t
currentIndex
;
// current accessed vgroup index.
}
SShowReqInfo
;
typedef
struct
SRequestSendRecvBody
{
tsem_t
rspSem
;
// not used now
tsem_t
rspSem
;
// not used now
void
*
fp
;
SShowReqInfo
showInfo
;
// todo this attribute will be removed after the query framework being completed.
SShowReqInfo
showInfo
;
// todo this attribute will be removed after the query framework being completed.
SDataBuf
requestMsg
;
struct
SSchJob
*
pQueryJob
;
// query job, created according to sql query DAG.
struct
SQueryDag
*
pDag
;
// the query dag, generated according to the sql statement.
struct
SSchJob
*
pQueryJob
;
// query job, created according to sql query DAG.
struct
SQueryDag
*
pDag
;
// the query dag, generated according to the sql statement.
SReqResultInfo
resInfo
;
}
SRequestSendRecvBody
;
#define ERROR_MSG_BUF_DEFAULT_SIZE
512
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
typedef
struct
SRequestObj
{
uint64_t
requestId
;
int32_t
type
;
// request type
STscObj
*
pTscObj
;
char
*
sqlstr
;
// sql string
int32_t
sqlLen
;
int64_t
self
;
char
*
msgBuf
;
void
*
pInfo
;
// sql parse info, generated by parser module
int32_t
code
;
SQueryExecMetric
metric
;
uint64_t
requestId
;
int32_t
type
;
// request type
STscObj
*
pTscObj
;
char
*
sqlstr
;
// sql string
int32_t
sqlLen
;
int64_t
self
;
char
*
msgBuf
;
void
*
pInfo
;
// sql parse info, generated by parser module
int32_t
code
;
SQueryExecMetric
metric
;
SRequestSendRecvBody
body
;
}
SRequestObj
;
...
...
@@ -198,51 +195,52 @@ extern int32_t clientReqRefPool;
extern
int32_t
clientConnRefPool
;
extern
int
(
*
handleRequestRspFp
[
TDMT_MAX
])(
void
*
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
int
genericRspCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
int
genericRspCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pReqObj
);
int
taos_init
();
int
taos_init
();
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
uint64_t
generateRequestId
();
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
destroyRequest
(
SRequestObj
*
pRequest
);
char
*
getDbOfConnection
(
STscObj
*
pObj
);
char
*
getDbOfConnection
(
STscObj
*
pObj
);
void
setConnectionDB
(
STscObj
*
pTscObj
,
const
char
*
db
);
void
taos_init_imp
(
void
);
int
taos_options_imp
(
TSDB_OPTION
option
,
const
char
*
str
);
int
taos_options_imp
(
TSDB_OPTION
option
,
const
char
*
str
);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
);
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
initMsgHandleFp
();
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
void
*
doFetchRow
(
SRequestObj
*
pRequest
);
TAOS
*
taos_connect_internal
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
void
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
);
void
*
doFetchRow
(
SRequestObj
*
pRequest
);
void
setResultDataPtr
(
SReqResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
);
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
int32_t
parseSql
(
SRequestObj
*
pRequest
,
SQueryNode
**
pQuery
);
// --- heartbeat
// --- heartbeat
// global, called by mgmt
int
hbMgrInit
();
void
hbMgrCleanUp
();
int
hbHandleRsp
(
SClientHbBatchRsp
*
hbRsp
);
// cluster level
SAppHbMgr
*
appHbMgrInit
(
SAppInstInfo
*
pAppInstInfo
,
char
*
key
);
void
appHbMgrCleanup
(
void
);
SAppHbMgr
*
appHbMgrInit
(
SAppInstInfo
*
pAppInstInfo
,
char
*
key
);
void
appHbMgrCleanup
(
void
);
// conn level
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int32_t
connId
,
int64_t
clusterId
,
int32_t
hbType
);
...
...
@@ -253,7 +251,6 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
// --- mq
void
hbMgrInitMqHbRspHandle
();
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientEnv.c
浏览文件 @
2271dee6
...
...
@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tnote.h"
#include "tref.h"
#include "trpc.h"
...
...
@@ -30,16 +30,16 @@
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED
0
#define TSC_VAR_RELEASED 0
SAppInfo
appInfo
;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
SAppInfo
appInfo
;
int32_t
clientReqRefPool
=
-
1
;
int32_t
clientConnRefPool
=
-
1
;
static
pthread_once_t
tscinit
=
PTHREAD_ONCE_INIT
;
volatile
int32_t
tscInitRes
=
0
;
volatile
int32_t
tscInitRes
=
0
;
static
void
registerRequest
(
SRequestObj
*
pRequest
)
{
static
void
registerRequest
(
SRequestObj
*
pRequest
)
{
STscObj
*
pTscObj
=
(
STscObj
*
)
taosAcquireRef
(
clientConnRefPool
,
pRequest
->
pTscObj
->
id
);
assert
(
pTscObj
!=
NULL
);
...
...
@@ -53,23 +53,25 @@ static void registerRequest(SRequestObj* pRequest) {
int32_t
total
=
atomic_add_fetch_32
(
&
pSummary
->
totalRequests
,
1
);
int32_t
currentInst
=
atomic_add_fetch_32
(
&
pSummary
->
currentRequests
,
1
);
tscDebug
(
"0x%"
PRIx64
" new Request from connObj:0x%"
PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
pTscObj
->
id
,
num
,
currentInst
,
total
,
pRequest
->
requestId
);
tscDebug
(
"0x%"
PRIx64
" new Request from connObj:0x%"
PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
pTscObj
->
id
,
num
,
currentInst
,
total
,
pRequest
->
requestId
);
}
}
static
void
deregisterRequest
(
SRequestObj
*
pRequest
)
{
static
void
deregisterRequest
(
SRequestObj
*
pRequest
)
{
assert
(
pRequest
!=
NULL
);
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SInstanceSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SInstanceSummary
*
pActivity
=
&
pTscObj
->
pAppInfo
->
summary
;
int32_t
currentInst
=
atomic_sub_fetch_32
(
&
pActivity
->
currentRequests
,
1
);
int32_t
num
=
atomic_sub_fetch_32
(
&
pTscObj
->
numOfReqs
,
1
);
int64_t
duration
=
taosGetTimestampMs
()
-
pRequest
->
metric
.
start
;
tscDebug
(
"0x%"
PRIx64
" free Request from connObj: 0x%"
PRIx64
", reqId:0x%"
PRIx64
" elapsed:%"
PRIu64
" ms, current:%d, app current:%d"
,
pRequest
->
self
,
pTscObj
->
id
,
pRequest
->
requestId
,
duration
,
num
,
currentInst
);
tscDebug
(
"0x%"
PRIx64
" free Request from connObj: 0x%"
PRIx64
", reqId:0x%"
PRIx64
" elapsed:%"
PRIu64
" ms, current:%d, app current:%d"
,
pRequest
->
self
,
pTscObj
->
id
,
pRequest
->
requestId
,
duration
,
num
,
currentInst
);
taosReleaseRef
(
clientConnRefPool
,
pTscObj
->
id
);
}
...
...
@@ -79,8 +81,8 @@ static void tscInitLogFile() {
printf
(
"failed to create log dir:%s
\n
"
,
tsLogDir
);
}
const
char
*
defaultLogFileNamePrefix
=
"taoslog"
;
const
int32_t
maxLogFileNum
=
10
;
const
char
*
defaultLogFileNamePrefix
=
"taoslog"
;
const
int32_t
maxLogFileNum
=
10
;
char
temp
[
128
]
=
{
0
};
sprintf
(
temp
,
"%s/%s"
,
tsLogDir
,
defaultLogFileNamePrefix
);
...
...
@@ -90,23 +92,24 @@ static void tscInitLogFile() {
}
// todo close the transporter properly
void
closeTransporter
(
STscObj
*
pTscObj
)
{
void
closeTransporter
(
STscObj
*
pTscObj
)
{
if
(
pTscObj
==
NULL
||
pTscObj
->
pAppInfo
->
pTransporter
==
NULL
)
{
return
;
}
tscDebug
(
"free transporter:%p in connObj: 0x%"
PRIx64
,
pTscObj
->
pAppInfo
->
pTransporter
,
pTscObj
->
id
);
tscDebug
(
"free transporter:%p in connObj: 0x%"
PRIx64
,
pTscObj
->
pAppInfo
->
pTransporter
,
pTscObj
->
id
);
rpcClose
(
pTscObj
->
pAppInfo
->
pTransporter
);
}
// TODO refactor
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThread
)
{
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThread
)
{
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"TSC"
;
rpcInit
.
numOfThreads
=
numOfThread
;
rpcInit
.
cfp
=
processMsgFromServer
;
rpcInit
.
pfp
=
persistConnForSpecificMsg
;
rpcInit
.
sessions
=
tsMaxConnections
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
user
=
(
char
*
)
user
;
...
...
@@ -115,7 +118,7 @@ void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit
.
spi
=
1
;
rpcInit
.
secret
=
(
char
*
)
auth
;
void
*
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
void
*
pDnodeConn
=
rpcOpen
(
&
rpcInit
);
if
(
pDnodeConn
==
NULL
)
{
tscError
(
"failed to init connection to server"
);
return
NULL
;
...
...
@@ -130,12 +133,12 @@ void destroyTscObj(void *pObj) {
SClientHbKey
connKey
=
{.
connId
=
pTscObj
->
connId
,
.
hbType
=
pTscObj
->
connType
};
hbDeregisterConn
(
pTscObj
->
pAppInfo
->
pAppHbMgr
,
connKey
);
atomic_sub_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
tscDebug
(
"connObj 0x%"
PRIx64
" destroyed, totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
->
pAppInfo
->
numOfConns
);
tscDebug
(
"connObj 0x%"
PRIx64
" destroyed, totalConn:%"
PRId64
,
pTscObj
->
id
,
pTscObj
->
pAppInfo
->
numOfConns
);
pthread_mutex_destroy
(
&
pTscObj
->
mutex
);
tfree
(
pTscObj
);
}
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
)
{
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
)
{
STscObj
*
pObj
=
(
STscObj
*
)
calloc
(
1
,
sizeof
(
STscObj
));
if
(
NULL
==
pObj
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
...
...
@@ -153,11 +156,11 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
pthread_mutex_init
(
&
pObj
->
mutex
,
NULL
);
pObj
->
id
=
taosAddRef
(
clientConnRefPool
,
pObj
);
tscDebug
(
"connObj created, 0x%"
PRIx64
,
pObj
->
id
);
tscDebug
(
"connObj created, 0x%"
PRIx64
,
pObj
->
id
);
return
pObj
;
}
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
)
{
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
)
{
assert
(
pObj
!=
NULL
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
calloc
(
1
,
sizeof
(
SRequestObj
));
...
...
@@ -166,20 +169,20 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return
NULL
;
}
pRequest
->
requestId
=
generateRequestId
();
pRequest
->
requestId
=
generateRequestId
();
pRequest
->
metric
.
start
=
taosGetTimestampMs
();
pRequest
->
type
=
type
;
pRequest
->
pTscObj
=
pObj
;
pRequest
->
body
.
fp
=
fp
;
// not used it yet
pRequest
->
msgBuf
=
calloc
(
1
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
pRequest
->
type
=
type
;
pRequest
->
pTscObj
=
pObj
;
pRequest
->
body
.
fp
=
fp
;
// not used it yet
pRequest
->
msgBuf
=
calloc
(
1
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
tsem_init
(
&
pRequest
->
body
.
rspSem
,
0
,
0
);
registerRequest
(
pRequest
);
return
pRequest
;
}
static
void
doFreeReqResultInfo
(
SReqResultInfo
*
pResInfo
)
{
static
void
doFreeReqResultInfo
(
SReqResultInfo
*
pResInfo
)
{
tfree
(
pResInfo
->
pRspMsg
);
tfree
(
pResInfo
->
length
);
tfree
(
pResInfo
->
row
);
...
...
@@ -187,9 +190,9 @@ static void doFreeReqResultInfo(SReqResultInfo* pResInfo) {
tfree
(
pResInfo
->
fields
);
}
static
void
doDestroyRequest
(
void
*
p
)
{
static
void
doDestroyRequest
(
void
*
p
)
{
assert
(
p
!=
NULL
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
p
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
p
;
assert
(
RID_VALID
(
pRequest
->
self
));
...
...
@@ -208,7 +211,7 @@ static void doDestroyRequest(void* p) {
tfree
(
pRequest
);
}
void
destroyRequest
(
SRequestObj
*
pRequest
)
{
void
destroyRequest
(
SRequestObj
*
pRequest
)
{
if
(
pRequest
==
NULL
)
{
return
;
}
...
...
@@ -252,14 +255,14 @@ void taos_init_imp(void) {
initTaskQueue
();
clientConnRefPool
=
taosOpenRef
(
200
,
destroyTscObj
);
clientReqRefPool
=
taosOpenRef
(
40960
,
doDestroyRequest
);
clientReqRefPool
=
taosOpenRef
(
40960
,
doDestroyRequest
);
taosGetAppName
(
appInfo
.
appName
,
NULL
);
pthread_mutex_init
(
&
appInfo
.
mutex
,
NULL
);
appInfo
.
pid
=
taosGetPId
();
appInfo
.
pid
=
taosGetPId
();
appInfo
.
startTime
=
taosGetTimestampMs
();
appInfo
.
pInstMap
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
appInfo
.
pInstMap
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tscDebug
(
"client is initialized successfully"
);
}
...
...
@@ -281,7 +284,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg
->
cfgStatus
=
TAOS_CFG_CSTATUS_OPTION
;
tscInfo
(
"set config file directory:%s"
,
str
);
}
else
{
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
}
break
;
...
...
@@ -296,7 +300,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg
->
cfgStatus
=
TAOS_CFG_CSTATUS_OPTION
;
tscInfo
(
"set shellActivityTimer:%d"
,
tsShellActivityTimer
);
}
else
{
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %d"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
*
(
int32_t
*
)
cfg
->
ptr
);
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %d"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
*
(
int32_t
*
)
cfg
->
ptr
);
}
break
;
...
...
@@ -313,8 +318,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
if
(
cfg
->
cfgStatus
<=
TAOS_CFG_CSTATUS_OPTION
)
{
char
sep
=
'.'
;
if
(
strlen
(
tsLocale
)
==
0
)
{
// locale does not set yet
char
*
defaultLocale
=
setlocale
(
LC_CTYPE
,
""
);
if
(
strlen
(
tsLocale
)
==
0
)
{
// locale does not set yet
char
*
defaultLocale
=
setlocale
(
LC_CTYPE
,
""
);
// The locale of the current OS does not be set correctly, so the default locale cannot be acquired.
// The launch of current system will abort soon.
...
...
@@ -329,10 +334,10 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
// set the user specified locale
char
*
locale
=
setlocale
(
LC_CTYPE
,
str
);
if
(
locale
!=
NULL
)
{
// failed to set the user specified locale
if
(
locale
!=
NULL
)
{
// failed to set the user specified locale
tscInfo
(
"locale set, prev locale:%s, new locale:%s"
,
tsLocale
,
locale
);
cfg
->
cfgStatus
=
TAOS_CFG_CSTATUS_OPTION
;
}
else
{
// set the user specified locale failed, use default LC_CTYPE as current locale
}
else
{
// set the user specified locale failed, use default LC_CTYPE as current locale
locale
=
setlocale
(
LC_CTYPE
,
tsLocale
);
tscInfo
(
"failed to set locale:%s, current locale:%s"
,
str
,
tsLocale
);
}
...
...
@@ -360,11 +365,12 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
}
free
(
charset
);
}
else
{
// it may be windows system
}
else
{
// it may be windows system
tscInfo
(
"charset remains:%s"
,
tsCharset
);
}
}
else
{
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
}
break
;
}
...
...
@@ -394,7 +400,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
tscInfo
(
"charset:%s not valid"
,
str
);
}
}
else
{
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
}
break
;
...
...
@@ -410,7 +417,8 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
cfg
->
cfgStatus
=
TAOS_CFG_CSTATUS_OPTION
;
tscDebug
(
"timezone set:%s, input:%s by taos_options"
,
tsTimezone
,
str
);
}
else
{
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
tscWarn
(
"config option:%s, input value:%s, is configured by %s, use %s"
,
cfg
->
option
,
str
,
tsCfgStatusStr
[
cfg
->
cfgStatus
],
(
char
*
)
cfg
->
ptr
);
}
break
;
...
...
@@ -434,7 +442,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
*/
uint64_t
generateRequestId
()
{
static
uint64_t
hashId
=
0
;
static
int32_t
requestSerialId
=
0
;
static
int32_t
requestSerialId
=
0
;
if
(
hashId
==
0
)
{
char
uid
[
64
]
=
{
0
};
...
...
@@ -448,9 +456,9 @@ uint64_t generateRequestId() {
}
}
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
int64_t
ts
=
taosGetTimestampMs
();
uint64_t
pid
=
taosGetPId
();
int32_t
val
=
atomic_add_fetch_32
(
&
requestSerialId
,
1
);
uint64_t
id
=
((
hashId
&
0x0FFF
)
<<
52
)
|
((
pid
&
0x0FFF
)
<<
40
)
|
((
ts
&
0xFFFFFF
)
<<
16
)
|
(
val
&
0xFFFF
);
return
id
;
...
...
source/client/src/clientImpl.c
浏览文件 @
2271dee6
...
...
@@ -370,7 +370,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo
->
fp
=
handleRequestRspFp
[
TMSG_INDEX
(
pMsgSendInfo
->
msgType
)];
pMsgSendInfo
->
param
=
pRequest
;
SConnectReq
connectReq
=
{
0
};
STscObj
*
pObj
=
pRequest
->
pTscObj
;
...
...
@@ -398,7 +397,9 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree
(
pMsgBody
->
msgInfo
.
pData
);
tfree
(
pMsgBody
);
}
bool
persistConnForSpecificMsg
(
void
*
parenct
,
tmsg_t
msgType
)
{
return
msgType
==
TDMT_VND_QUERY_RSP
||
msgType
==
TDMT_VND_FETCH_RSP
||
msgType
==
TDMT_VND_RES_READY_RSP
;
}
void
processMsgFromServer
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SMsgSendInfo
*
pSendInfo
=
(
SMsgSendInfo
*
)
pMsg
->
ahandle
;
assert
(
pMsg
->
ahandle
!=
NULL
);
...
...
source/libs/transport/inc/transportInt.h
浏览文件 @
2271dee6
...
...
@@ -66,6 +66,7 @@ typedef struct {
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
int
(
*
afp
)(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
bool
(
*
pfp
)(
void
*
parent
,
tmsg_t
msgType
);
int32_t
refCount
;
void
*
parent
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
2271dee6
...
...
@@ -29,7 +29,12 @@ void* rpcOpen(const SRpcInit* pInit) {
if
(
pInit
->
label
)
{
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strlen
(
pInit
->
label
)
+
1
);
}
// register callback handle
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
afp
=
pInit
->
afp
;
pRpc
->
pfp
=
pInit
->
pfp
;
if
(
pInit
->
connType
==
TAOS_CONN_SERVER
)
{
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
}
else
{
...
...
source/libs/transport/src/transCli.c
浏览文件 @
2271dee6
...
...
@@ -134,8 +134,7 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
if
(
rpcMsg
.
msgType
==
TDMT_VND_QUERY_RSP
||
rpcMsg
.
msgType
==
TDMT_VND_FETCH_RSP
||
rpcMsg
.
msgType
==
TDMT_VND_RES_READY_RSP
)
{
if
(
pRpc
->
pfp
!=
NULL
&&
(
pRpc
->
pfp
)(
pRpc
->
parent
,
rpcMsg
.
msgType
))
{
rpcMsg
.
handle
=
conn
;
conn
->
persist
=
1
;
tDebug
(
"client conn %p persist by app"
,
conn
);
...
...
@@ -185,18 +184,13 @@ static void clientHandleExcept(SCliConn* pConn) {
clientConnDestroy
(
pConn
,
true
);
return
;
}
SCliMsg
*
pMsg
=
pConn
->
data
;
tmsg_t
msgType
=
TDMT_MND_CONNECT
;
if
(
pMsg
!=
NULL
)
{
msgType
=
pMsg
->
msg
.
msgType
;
}
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
rpcMsg
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
rpcMsg
.
msgType
=
msgType
+
1
;
rpcMsg
.
msgType
=
pMsg
->
msg
.
msgType
+
1
;
if
(
pConn
->
push
!=
NULL
&&
pConn
->
ctnRdCnt
!=
0
)
{
(
*
pConn
->
push
->
callback
)(
pConn
->
push
->
arg
,
&
rpcMsg
);
...
...
@@ -445,7 +439,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
addrlen
=
sizeof
(
pConn
->
locaddr
);
uv_tcp_getsockname
((
uv_tcp_t
*
)
pConn
->
stream
,
(
struct
sockaddr
*
)
&
pConn
->
locaddr
,
&
addrlen
);
tTrace
(
"client conn %p c
reate
"
,
pConn
);
tTrace
(
"client conn %p c
onnect to server successfully
"
,
pConn
);
assert
(
pConn
->
stream
==
req
->
handle
);
clientWrite
(
pConn
);
...
...
@@ -524,6 +518,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
struct
sockaddr_in
addr
;
uv_ip4_addr
(
pMsg
->
ctx
->
ip
,
pMsg
->
ctx
->
port
,
&
addr
);
// handle error in callback if fail to connect
tTrace
(
"client conn %p try to connect to %s:%d"
,
conn
,
pMsg
->
ctx
->
ip
,
pMsg
->
ctx
->
port
);
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
clientConnCb
);
}
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
2271dee6
...
...
@@ -413,11 +413,6 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
}
else
{
uvStartSendResp
(
msg
);
}
// uv_buf_t wb;
// uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer);
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
}
}
static
void
uvAcceptAsyncCb
(
uv_async_t
*
async
)
{
...
...
@@ -490,7 +485,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn
->
pTimer
->
data
=
pConn
;
pConn
->
hostThrd
=
pThrd
;
// pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
// init client handle
pConn
->
pTcp
=
(
uv_tcp_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
...
...
@@ -730,14 +724,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
}
void
sendQuitToWorkThrd
(
SWorkThrdObj
*
pThrd
)
{
SSrvMsg
*
srvMsg
=
calloc
(
1
,
sizeof
(
SSrvMsg
));
// pthread_mutex_lock(&pThrd->msgMtx);
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
tDebug
(
"send quit msg to work thread"
);
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
// uv_async_send(pThrd->workerAsync);
}
void
taosCloseServer
(
void
*
arg
)
{
...
...
@@ -774,19 +763,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
SSrvMsg
*
srvMsg
=
calloc
(
1
,
sizeof
(
SSrvMsg
));
srvMsg
->
pConn
=
pConn
;
srvMsg
->
msg
=
*
pMsg
;
// pthread_mutex_lock(&pThrd->msgMtx);
// QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
// pthread_mutex_unlock(&pThrd->msgMtx);
tTrace
(
"server conn %p start to send resp"
,
pConn
);
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
// uv_async_send(pThrd->workerAsync);
}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
SSrvConn
*
pConn
=
thandle
;
// struct sockaddr* pPeerName = &pConn->peername;
struct
sockaddr_in
addr
=
pConn
->
addr
;
pInfo
->
clientIp
=
(
uint32_t
)(
addr
.
sin_addr
.
s_addr
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录