未验证 提交 76571422 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10053 from taosdata/feature/trans

stop client gracefully
...@@ -45,13 +45,13 @@ typedef struct SCliThrdObj { ...@@ -45,13 +45,13 @@ typedef struct SCliThrdObj {
pthread_t thread; pthread_t thread;
uv_loop_t* loop; uv_loop_t* loop;
uv_async_t* cliAsync; // uv_async_t* cliAsync; //
uv_timer_t* pTimer; uv_timer_t* timer;
void* pool; // conn pool void* pool; // conn pool
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
bool quit;
} SCliThrdObj; } SCliThrdObj;
typedef struct SClientObj { typedef struct SClientObj {
...@@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn); ...@@ -94,6 +94,8 @@ static void clientHandleResp(SCliConn* conn);
static void clientHandleExcept(SCliConn* conn); static void clientHandleExcept(SCliConn* conn);
// handle req from app // handle req from app
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata); static void destroyUserdata(SRpcMsg* userdata);
...@@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -136,8 +138,8 @@ static void clientHandleResp(SCliConn* conn) {
destroyCmsg(pMsg); destroyCmsg(pMsg);
conn->data = NULL; conn->data = NULL;
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
...@@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -155,7 +157,7 @@ static void clientHandleExcept(SCliConn* pConn) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
rpcMsg.code = -1; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
// SRpcInfo* pRpc = pMsg->ctx->pRpc; // SRpcInfo* pRpc = pMsg->ctx->pRpc;
(pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL);
pConn->notifyCount += 1; pConn->notifyCount += 1;
...@@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -332,9 +334,8 @@ static void clientWriteCb(uv_write_t* req, int status) {
tDebug("conn %p data already was written out", pConn); tDebug("conn %p data already was written out", pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) { if (pMsg == NULL) {
destroy // handle
// handle return;
return;
} }
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
} else { } else {
...@@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -375,6 +376,15 @@ static void clientConnCb(uv_connect_t* req, int status) {
clientWrite(pConn); clientWrite(pConn);
} }
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("thread %p start to quit", pThrd);
destroyCmsg(pMsg);
uv_close((uv_handle_t*)pThrd->cliAsync, NULL);
uv_timer_stop(pThrd->timer);
pThrd->quit = true;
// uv__async_stop(pThrd->cliAsync);
uv_stop(pThrd->loop);
}
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
uint64_t et = taosGetTimestampUs(); uint64_t et = taosGetTimestampUs();
uint64_t el = et - pMsg->st; uint64_t el = et - pMsg->st;
...@@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -389,7 +399,13 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
if (pThrd->quit) {
clientHandleExcept(conn);
return;
}
clientWrite(conn); clientWrite(conn);
} else { } else {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->ref++; conn->ref++;
...@@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) { ...@@ -430,7 +446,12 @@ static void clientAsyncCb(uv_async_t* handle) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
clientHandleReq(pMsg, pThrd); if (pMsg->ctx == NULL) {
clientHandleQuit(pMsg, pThrd);
} else {
clientHandleReq(pMsg, pThrd);
}
// clientHandleReq(pMsg, pThrd);
count++; count++;
} }
if (count >= 2) { if (count >= 2) {
...@@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -458,7 +479,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd));
if (err == 0) { if (err == 0) {
tDebug("sucess to create tranport-client thread %d", i); tDebug("success to create tranport-client thread %d", i);
} }
cli->pThreadObj[i] = pThrd; cli->pThreadObj[i] = pThrd;
} }
...@@ -492,20 +513,24 @@ static SCliThrdObj* createThrdObj() { ...@@ -492,20 +513,24 @@ static SCliThrdObj* createThrdObj() {
uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
pThrd->cliAsync->data = pThrd; pThrd->cliAsync->data = pThrd;
pThrd->pTimer = malloc(sizeof(uv_timer_t)); pThrd->timer = malloc(sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, pThrd->pTimer); uv_timer_init(pThrd->loop, pThrd->timer);
pThrd->pTimer->data = pThrd; pThrd->timer->data = pThrd;
pThrd->pool = creatConnPool(1); pThrd->pool = creatConnPool(1);
pThrd->quit = false;
return pThrd; return pThrd;
} }
static void destroyThrdObj(SCliThrdObj* pThrd) { static void destroyThrdObj(SCliThrdObj* pThrd) {
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
} }
uv_stop(pThrd->loop);
pthread_join(pThrd->thread, NULL); pthread_join(pThrd->thread, NULL);
pthread_mutex_destroy(&pThrd->msgMtx); pthread_mutex_destroy(&pThrd->msgMtx);
free(pThrd->cliAsync); free(pThrd->cliAsync);
free(pThrd->timer);
free(pThrd->loop); free(pThrd->loop);
free(pThrd); free(pThrd);
} }
...@@ -517,10 +542,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { ...@@ -517,10 +542,22 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free(ctx); free(ctx);
} }
// //
static void clientSendQuit(SCliThrdObj* thrd) {
// cli can stop gracefully
SCliMsg* msg = calloc(1, sizeof(SCliMsg));
msg->ctx = NULL; //
pthread_mutex_lock(&thrd->msgMtx);
QUEUE_PUSH(&thrd->msg, &msg->q);
pthread_mutex_unlock(&thrd->msgMtx);
uv_async_send(thrd->cliAsync);
}
void taosCloseClient(void* arg) { void taosCloseClient(void* arg) {
// impl later // impl later
SClientObj* cli = arg; SClientObj* cli = arg;
for (int i = 0; i < cli->numOfThreads; i++) { for (int i = 0; i < cli->numOfThreads; i++) {
clientSendQuit(cli->pThreadObj[i]);
destroyThrdObj(cli->pThreadObj[i]); destroyThrdObj(cli->pThreadObj[i]);
} }
free(cli->pThreadObj); free(cli->pThreadObj);
......
...@@ -70,6 +70,7 @@ typedef struct SServerObj { ...@@ -70,6 +70,7 @@ typedef struct SServerObj {
uv_pipe_t** pipe; uv_pipe_t** pipe;
uint32_t ip; uint32_t ip;
uint32_t port; uint32_t port;
uv_async_t* pAcceptAsync; // just to quit from from accept thread
} SServerObj; } SServerObj;
static const char* notify = "a"; static const char* notify = "a";
...@@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status); ...@@ -88,9 +89,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg); static void uvStartSendResp(SSrvMsg* msg);
static void destroySmsg(SSrvMsg* smsg); static void destroySmsg(SSrvMsg* smsg);
// check whether already read complete packet // check whether already read complete packet
static bool readComplete(SConnBuffer* buf); static bool readComplete(SConnBuffer* buf);
...@@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -389,7 +392,13 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
tError("except occurred, continue"); tError("except occurred, continue");
continue; continue;
} }
uvStartSendResp(msg); if (msg->pConn == NULL) {
//
free(msg);
uv_stop(pThrd->loop);
} else {
uvStartSendResp(msg);
}
// uv_buf_t wb; // uv_buf_t wb;
// uvPrepareSendData(msg, &wb); // uvPrepareSendData(msg, &wb);
// uv_timer_stop(conn->pTimer); // uv_timer_stop(conn->pTimer);
...@@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -397,6 +406,10 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
// uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); // uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
} }
} }
static void uvAcceptAsyncCb(uv_async_t* async) {
SServerObj* srv = async->data;
uv_stop(srv->loop);
}
void uvOnAcceptCb(uv_stream_t* stream, int status) { void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) { if (status == -1) {
...@@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) { ...@@ -517,8 +530,12 @@ static bool addHandleToAcceptloop(void* arg) {
return false; return false;
} }
struct sockaddr_in bind_addr; // register an async here to quit server gracefully
srv->pAcceptAsync = calloc(1, sizeof(uv_async_t));
uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb);
srv->pAcceptAsync->data = srv;
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) {
tError("failed to bind: %s", uv_err_name(err)); tError("failed to bind: %s", uv_err_name(err));
...@@ -647,21 +664,42 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { ...@@ -647,21 +664,42 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
return; return;
} }
pthread_join(pThrd->thread, NULL); pthread_join(pThrd->thread, NULL);
// free(srv->pipe[i]);
free(pThrd->loop); free(pThrd->loop);
pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->workerAsync);
free(pThrd); free(pThrd);
} }
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg));
pthread_mutex_lock(&pThrd->msgMtx);
QUEUE_PUSH(&pThrd->msg, &srvMsg->q);
pthread_mutex_unlock(&pThrd->msgMtx);
tDebug("send quit msg to work thread");
uv_async_send(pThrd->workerAsync);
}
void taosCloseServer(void* arg) { void taosCloseServer(void* arg) {
// impl later // impl later
SServerObj* srv = arg; SServerObj* srv = arg;
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
sendQuitToWorkThrd(srv->pThreadObj[i]);
destroyWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]);
} }
tDebug("send quit msg to accept thread");
uv_async_send(srv->pAcceptAsync);
pthread_join(srv->thread, NULL);
free(srv->pThreadObj);
free(srv->pAcceptAsync);
free(srv->loop); free(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) {
free(srv->pipe[i]);
}
free(srv->pipe); free(srv->pipe);
free(srv->pThreadObj);
pthread_join(srv->thread, NULL);
free(srv); free(srv);
} }
......
add_executable(transportTest "") add_executable(transportTest "")
add_executable(client "") add_executable(client "")
add_executable(server "") add_executable(server "")
add_executable(transUT "")
target_sources(transUT
PRIVATE
"transUT.cc"
)
target_sources(transportTest target_sources(transportTest
PRIVATE PRIVATE
...@@ -28,6 +34,13 @@ target_link_libraries (transportTest ...@@ -28,6 +34,13 @@ target_link_libraries (transportTest
gtest_main gtest_main
transport transport
) )
target_link_libraries (transUT
os
util
common
gtest_main
transport
)
target_include_directories(client target_include_directories(client
PUBLIC PUBLIC
...@@ -48,6 +61,13 @@ target_include_directories(server ...@@ -48,6 +61,13 @@ target_include_directories(server
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(transUT
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/transport"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (server target_link_libraries (server
os os
util util
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <cstdio>
#include <cstring>
#include "trpc.h"
using namespace std;
class TransObj {
public:
TransObj() {
const char *label = "APP";
const char *secret = "secret";
const char *user = "user";
const char *ckey = "ckey";
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = (char *)label;
rpcInit.numOfThreads = 5;
rpcInit.cfp = NULL;
rpcInit.sessions = 100;
rpcInit.idleTime = 100;
rpcInit.user = (char *)user;
rpcInit.secret = (char *)secret;
rpcInit.ckey = (char *)ckey;
rpcInit.spi = 1;
}
bool startCli() {
trans = NULL;
rpcInit.connType = TAOS_CONN_CLIENT;
trans = rpcOpen(&rpcInit);
return trans != NULL ? true : false;
}
bool startSrv() {
trans = NULL;
rpcInit.connType = TAOS_CONN_SERVER;
trans = rpcOpen(&rpcInit);
return trans != NULL ? true : false;
}
bool stop() {
rpcClose(trans);
trans = NULL;
return true;
}
private:
void * trans;
SRpcInit rpcInit;
};
class TransEnv : public ::testing::Test {
protected:
virtual void SetUp() {
// set up trans obj
tr = new TransObj();
}
virtual void TearDown() {
// tear down
delete tr;
}
TransObj *tr = NULL;
};
TEST_F(TransEnv, test_start_stop) {
assert(tr->startCli());
assert(tr->stop());
assert(tr->startSrv());
assert(tr->stop());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册