提交 19686d9a 编写于 作者: dengyihao's avatar dengyihao

update transport

上级 914b5387
...@@ -258,7 +258,7 @@ void checkFstCheckIterator() { ...@@ -258,7 +258,7 @@ void checkFstCheckIterator() {
// prefix search // prefix search
std::vector<uint64_t> result; std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); AutomationCtx* ctx = automCtxCreate((void*)"H", AUTOMATION_PREFIX);
m->Search(ctx, result); m->Search(ctx, result);
std::cout << "size: " << result.size() << std::endl; std::cout << "size: " << result.size() << std::endl;
// assert(result.size() == count); // assert(result.size() == count);
...@@ -328,11 +328,11 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) { ...@@ -328,11 +328,11 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// tool to check all kind of fst test // tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); } // if (argc > 1) { validateTFile(argv[1]); }
if (argc > 4) { // if (argc > 4) {
// path suid colName ver // path suid colName ver
iterTFileReader(argv[1], argv[2], argv[3], argv[4]); // iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
} //}
// checkFstCheckIterator(); checkFstCheckIterator();
// checkFstLongTerm(); // checkFstLongTerm();
// checkFstPrefixSearch(); // checkFstPrefixSearch();
......
...@@ -123,9 +123,8 @@ typedef struct { ...@@ -123,9 +123,8 @@ typedef struct {
} SRpcReqContext; } SRpcReqContext;
typedef struct { typedef struct {
SRpcInfo* pTransInst; // associated SRpcInfo SEpSet epSet; // ip list provided by app
SEpSet epSet; // ip list provided by app void* ahandle; // handle provided by app
void* ahandle; // handle provided by app
// struct SRpcConn* pConn; // pConn allocated // struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app uint8_t* pCont; // content provided by app
...@@ -244,6 +243,7 @@ int transDestroyBuffer(SConnBuffer* buf); ...@@ -244,6 +243,7 @@ int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf); bool transReadComplete(SConnBuffer* connBuf);
int transSetConnOption(uv_tcp_t* stream);
// int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen); // int transPackMsg(SRpcMsg *rpcMsg, bool sercured, bool auth, char **msg, int32_t *msgLen);
// int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool ); // int transUnpackMsg(char *msg, SRpcMsg *pMsg, bool );
......
...@@ -20,7 +20,10 @@ ...@@ -20,7 +20,10 @@
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
uv_stream_t* stream; uv_stream_t* stream;
uv_write_t* writeReq; uv_write_t* writeReq;
...@@ -32,8 +35,7 @@ typedef struct SCliConn { ...@@ -32,8 +35,7 @@ typedef struct SCliConn {
int8_t ctnRdCnt; // continue read count int8_t ctnRdCnt; // continue read count
int hThrdIdx; int hThrdIdx;
SRpcPush* push; int persist; //
int persist; //
// spi configure // spi configure
char spi; char spi;
char secured; char secured;
...@@ -41,6 +43,7 @@ typedef struct SCliConn { ...@@ -41,6 +43,7 @@ typedef struct SCliConn {
// debug and log info // debug and log info
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
...@@ -54,14 +57,17 @@ typedef struct SCliThrdObj { ...@@ -54,14 +57,17 @@ typedef struct SCliThrdObj {
pthread_t thread; pthread_t thread;
uv_loop_t* loop; uv_loop_t* loop;
// uv_async_t* cliAsync; // // uv_async_t* cliAsync; //
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_timer_t* timer; uv_timer_t* timer;
void* pool; // conn pool void* pool; // conn pool
// msg queue
queue msg; queue msg;
pthread_mutex_t msgMtx; pthread_mutex_t msgMtx;
uint64_t nextTimeout; // next timeout
void* pTransInst; // uint64_t nextTimeout; // next timeout
bool quit; void* pTransInst; //
bool quit;
} SCliThrdObj; } SCliThrdObj;
typedef struct SClientObj { typedef struct SClientObj {
...@@ -96,7 +102,7 @@ static void clientAsyncCb(uv_async_t* handle); ...@@ -96,7 +102,7 @@ static void clientAsyncCb(uv_async_t* handle);
static void clientDestroy(uv_handle_t* handle); static void clientDestroy(uv_handle_t* handle);
static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
// process data read from server, auth/decompress etc later // process data read from server, add decompress etc later
static void clientHandleResp(SCliConn* conn); static void clientHandleResp(SCliConn* conn);
// handle except about conn // handle except about conn
static void clientHandleExcept(SCliConn* conn); static void clientHandleExcept(SCliConn* conn);
...@@ -104,9 +110,10 @@ static void clientHandleExcept(SCliConn* conn); ...@@ -104,9 +110,10 @@ static void clientHandleExcept(SCliConn* conn);
static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd);
static void clientSendQuit(SCliThrdObj* thrd); static void clientSendQuit(SCliThrdObj* thrd);
static void destroyUserdata(SRpcMsg* userdata); static void destroyUserdata(SRpcMsg* userdata);
static int clientRBChoseIdx(SRpcInfo* pTransInst);
static void destroyCmsg(SCliMsg* cmsg); static void destroyCmsg(SCliMsg* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx); static void transDestroyConnCtx(STransConnCtx* ctx);
// thread obj // thread obj
...@@ -115,10 +122,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -115,10 +122,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
// thread // thread
static void* clientThread(void* arg); static void* clientThread(void* arg);
static void clientHandleResp(SCliConn* conn) { static void* clientNotifyApp() {}
static void clientHandleResp(SCliConn* conn) {
SCliMsg* pMsg = conn->data; SCliMsg* pMsg = conn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pRpc = pCtx->pTransInst;
SCliThrdObj* pThrd = conn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst;
STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
...@@ -134,26 +144,24 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -134,26 +144,24 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
if (pRpc->pfp != NULL && (pRpc->pfp)(pRpc->parent, rpcMsg.msgType)) { if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn); tDebug("client conn %p persist by app", conn);
} }
tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pRpc->label, conn, tDebug("%s client conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
if (conn->push != NULL && conn->ctnRdCnt != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg); if (pCtx->pSem == NULL) {
conn->push = NULL;
} else {
if (pCtx->pSem == NULL) { if (pCtx->pSem == NULL) {
tTrace("%s client conn %p handle resp", pRpc->label, conn); tTrace("%s client conn %p handle resp", pTransInst->label, conn);
(pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
} else { } else {
tTrace("%s client conn(sync) %p handle resp", pRpc->label, conn); tTrace("%s client conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg));
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
} }
...@@ -162,28 +170,33 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -162,28 +170,33 @@ static void clientHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->push == NULL && conn->persist == 0) { if (conn->persist == 0) {
if (pRpc->noPool == true) { if (pTransInst->noPool == true) {
destroyCmsg(conn->data);
clientConnDestroy(conn, true);
} else { } else {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
destroyCmsg(conn->data);
conn->data = NULL;
} }
} else {
// app decide to free or not
} }
destroyCmsg(conn->data);
conn->data = NULL;
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) {
// uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL && pConn->push == NULL) { if (pConn->data == NULL) {
// handle conn except in conn pool // handle conn except in conn pool
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
return; return;
} }
SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst;
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
...@@ -192,29 +205,14 @@ static void clientHandleExcept(SCliConn* pConn) { ...@@ -192,29 +205,14 @@ static void clientHandleExcept(SCliConn* pConn) {
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcMsg.msgType = pMsg->msg.msgType + 1; rpcMsg.msgType = pMsg->msg.msgType + 1;
if (pConn->push != NULL && pConn->ctnRdCnt != 0) { if (pCtx->pSem == NULL) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL);
pConn->push = NULL;
} else { } else {
if (pCtx->pSem == NULL) { memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
(pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); tsem_post(pCtx->pSem);
} else {
memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg));
tsem_post(pCtx->pSem);
}
if (pConn->push != NULL) {
(*pConn->push->callback)(pConn->push->arg, &rpcMsg);
}
pConn->push = NULL;
} }
tTrace("%s client conn %p start to destroy", pCtx->pTransInst->label, pConn); tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn);
if (pConn->push == NULL) {
destroyCmsg(pConn->data);
pConn->data = NULL;
}
// transDestroyConnCtx(pCtx);
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
pConn->ctnRdCnt += 1;
} }
static void clientTimeoutCb(uv_timer_t* handle) { static void clientTimeoutCb(uv_timer_t* handle) {
...@@ -316,17 +314,14 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -316,17 +314,14 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (transReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("client conn %p read complete", conn); tTrace("%s client conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
clientHandleResp(conn); clientHandleResp(conn);
} else { } else {
tTrace("client conn %p read partial packet, continue to read", conn); tTrace("%s client conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
} }
return; return;
} }
if (nread == UV_EOF) {
tError("client conn %p read error: %s", conn, uv_err_name(nread));
clientHandleExcept(conn);
}
assert(nread <= 0); assert(nread <= 0);
if (nread == 0) { if (nread == 0) {
// ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb
...@@ -335,18 +330,16 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf ...@@ -335,18 +330,16 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
return; return;
} }
if (nread < 0) { if (nread < 0) {
tError("client conn %p read error: %s", conn, uv_err_name(nread)); tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread));
clientHandleExcept(conn); clientHandleExcept(conn);
} }
// tDebug("Read error %s\n", uv_err_name(nread));
// uv_close((uv_handle_t*)handle, clientDestroy);
} }
static void clientConnDestroy(SCliConn* conn, bool clear) { static void clientConnDestroy(SCliConn* conn, bool clear) {
// //
conn->ref--; conn->ref--;
if (conn->ref == 0) { if (conn->ref == 0) {
tTrace("client conn %p remove from conn pool", conn); tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy); uv_close((uv_handle_t*)conn->stream, clientDestroy);
...@@ -367,8 +360,9 @@ static void clientDestroy(uv_handle_t* handle) { ...@@ -367,8 +360,9 @@ static void clientDestroy(uv_handle_t* handle) {
static void clientWriteCb(uv_write_t* req, int status) { static void clientWriteCb(uv_write_t* req, int status) {
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status == 0) { if (status == 0) {
tTrace("client conn %p data already was written out", pConn); tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
SCliMsg* pMsg = pConn->data; SCliMsg* pMsg = pConn->data;
if (pMsg == NULL) { if (pMsg == NULL) {
// handle // handle
...@@ -376,18 +370,19 @@ static void clientWriteCb(uv_write_t* req, int status) { ...@@ -376,18 +370,19 @@ static void clientWriteCb(uv_write_t* req, int status) {
} }
destroyUserdata(&pMsg->msg); destroyUserdata(&pMsg->msg);
} else { } else {
tError("client conn %p failed to write: %s", pConn, uv_err_name(status)); tError("%s client conn %p failed to write: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
clientHandleExcept(pConn); clientHandleExcept(pConn);
return; return;
} }
SCliThrdObj* pThrd = pConn->hostThrd;
uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
} }
static void clientWrite(SCliConn* pConn) { static void clientWrite(SCliConn* pConn) {
SCliMsg* pCliMsg = pConn->data; SCliMsg* pCliMsg = pConn->data;
STransConnCtx* pCtx = pCliMsg->ctx; STransConnCtx* pCtx = pCliMsg->ctx;
SRpcInfo* pTransInst = pCtx->pTransInst;
SCliThrdObj* pThrd = pConn->hostThrd;
SRpcInfo* pTransInst = pThrd->pTransInst;
SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg);
...@@ -416,20 +411,18 @@ static void clientWrite(SCliConn* pConn) { ...@@ -416,20 +411,18 @@ static void clientWrite(SCliConn* pConn) {
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
// if (pHead->msgType == TDMT_VND_QUERY || pHead->msgType == TDMT_VND_)
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), tDebug("%s client conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
ntohs(pConn->locaddr.sin_port)); inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb);
} }
static void clientConnCb(uv_connect_t* req, int status) { static void clientConnCb(uv_connect_t* req, int status) {
// impl later // impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
if (status != 0) { if (status != 0) {
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); tError("%s client conn %p failed to connect server: %s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status));
tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status));
clientHandleExcept(pConn); clientHandleExcept(pConn);
return; return;
} }
...@@ -439,7 +432,7 @@ static void clientConnCb(uv_connect_t* req, int status) { ...@@ -439,7 +432,7 @@ static void clientConnCb(uv_connect_t* req, int status) {
addrlen = sizeof(pConn->locaddr); addrlen = sizeof(pConn->locaddr);
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen);
tTrace("client conn %p connect to server successfully", pConn); tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
assert(pConn->stream == req->handle); assert(pConn->stream == req->handle);
clientWrite(pConn); clientWrite(pConn);
...@@ -462,20 +455,18 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -462,20 +455,18 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
et = taosGetTimestampUs(); et = taosGetTimestampUs();
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
SRpcInfo* pTransInst = pThrd->pTransInst;
SCliConn* conn = NULL;
SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) {
if (pMsg->msg.handle == NULL) { conn = (SCliConn*)(pMsg->msg.handle);
if (pCtx->pTransInst->noPool == true) {
} else {
conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
}
if (conn != NULL) { if (conn != NULL) {
tTrace("client conn %p get from conn pool", conn); tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn);
} }
} else { } else {
conn = (SCliConn*)(pMsg->msg.handle); if (pTransInst->noPool == false) {
if (conn != NULL) { conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port);
tTrace("client conn %p reused", conn); if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
} }
} }
...@@ -489,7 +480,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -489,7 +480,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
return; return;
} }
clientWrite(conn); clientWrite(conn);
} else { } else {
conn = calloc(1, sizeof(SCliConn)); conn = calloc(1, sizeof(SCliConn));
conn->ref++; conn->ref++;
...@@ -497,12 +487,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -497,12 +487,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
conn->stream->data = conn; conn->stream->data = conn;
uv_tcp_nodelay((uv_tcp_t*)conn->stream, 1);
int ret = uv_tcp_keepalive((uv_tcp_t*)conn->stream, 1, 1);
if (ret) {
tTrace("client conn %p failed to set keepalive, %s", conn, uv_err_name(ret));
}
// write req handle
conn->writeReq = malloc(sizeof(uv_write_t)); conn->writeReq = malloc(sizeof(uv_write_t));
conn->writeReq->data = conn; conn->writeReq->data = conn;
...@@ -512,17 +497,17 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -512,17 +497,17 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
conn->data = pMsg; conn->data = pMsg;
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
// conn->push = pMsg->msg.push; int ret = transSetConnOption((uv_tcp_t*)conn->stream);
// conn->ctnRdCnt = 0; if (ret) {
tError("%s client conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
}
struct sockaddr_in addr; struct sockaddr_in addr;
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
// handle error in callback if fail to connect // handle error in callback if fail to connect
tTrace("client conn %p try to connect to %s:%d", conn, pMsg->ctx->ip, pMsg->ctx->port); tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
} }
conn->push = pMsg->msg.push;
conn->ctnRdCnt = 0; conn->ctnRdCnt = 0;
conn->hThrdIdx = pCtx->hThrdIdx; conn->hThrdIdx = pCtx->hThrdIdx;
} }
...@@ -548,7 +533,6 @@ static void clientAsyncCb(uv_async_t* handle) { ...@@ -548,7 +533,6 @@ static void clientAsyncCb(uv_async_t* handle) {
} else { } else {
clientHandleReq(pMsg, pThrd); clientHandleReq(pMsg, pThrd);
} }
// clientHandleReq(pMsg, pThrd);
count++; count++;
} }
if (count >= 2) { if (count >= 2) {
...@@ -656,37 +640,36 @@ void taosCloseClient(void* arg) { ...@@ -656,37 +640,36 @@ void taosCloseClient(void* arg) {
free(cli->pThreadObj); free(cli->pThreadObj);
free(cli); free(cli);
} }
static int clientRBChoseIdx(SRpcInfo* pRpc) { static int clientRBChoseIdx(SRpcInfo* pTransInst) {
int64_t index = pRpc->index; int64_t index = pTransInst->index;
if (pRpc->index++ >= pRpc->numOfThreads) { if (pTransInst->index++ >= pTransInst->numOfThreads) {
pRpc->index = 0; pTransInst->index = 0;
} }
return index % pRpc->numOfThreads; return index % pTransInst->numOfThreads;
} }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
// impl later // impl later
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port; uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pRpc = (SRpcInfo*)shandle; SRpcInfo* pTransInst = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pMsg->handle); int index = CONN_HOST_THREAD_INDEX(pMsg->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pRpc); index = clientRBChoseIdx(pTransInst);
} }
int32_t flen = 0; int32_t flen = 0;
if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) {
// imp later // imp later
} }
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pMsg->ahandle; pCtx->ahandle = pMsg->ahandle;
pCtx->msgType = pMsg->msgType; pCtx->msgType = pMsg->msgType;
pCtx->ip = strdup(ip); pCtx->ip = strdup(ip);
pCtx->port = port; pCtx->port = port;
pCtx->hThrdIdx = index; pCtx->hThrdIdx = index;
assert(pRpc->connType == TAOS_CONN_CLIENT); assert(pTransInst->connType == TAOS_CONN_CLIENT);
// atomic or not // atomic or not
SCliMsg* cliMsg = malloc(sizeof(SCliMsg)); SCliMsg* cliMsg = malloc(sizeof(SCliMsg));
...@@ -694,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* ...@@ -694,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
cliMsg->msg = *pMsg; cliMsg->msg = *pMsg;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
} }
...@@ -702,15 +685,14 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { ...@@ -702,15 +685,14 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn);
uint32_t port = pEpSet->eps[pEpSet->inUse].port; uint32_t port = pEpSet->eps[pEpSet->inUse].port;
SRpcInfo* pRpc = (SRpcInfo*)shandle; SRpcInfo* pTransInst = (SRpcInfo*)shandle;
int index = CONN_HOST_THREAD_INDEX(pReq->handle); int index = CONN_HOST_THREAD_INDEX(pReq->handle);
if (index == -1) { if (index == -1) {
index = clientRBChoseIdx(pRpc); index = clientRBChoseIdx(pTransInst);
} }
STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx));
pCtx->pTransInst = (SRpcInfo*)shandle;
pCtx->ahandle = pReq->ahandle; pCtx->ahandle = pReq->ahandle;
pCtx->msgType = pReq->msgType; pCtx->msgType = pReq->msgType;
pCtx->ip = strdup(ip); pCtx->ip = strdup(ip);
...@@ -725,7 +707,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { ...@@ -725,7 +707,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
cliMsg->msg = *pReq; cliMsg->msg = *pReq;
cliMsg->st = taosGetTimestampUs(); cliMsg->st = taosGetTimestampUs();
SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index]; SCliThrdObj* thrd = ((SClientObj*)pTransInst->tcphandle)->pThreadObj[index];
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
tsem_t* pSem = pCtx->pSem; tsem_t* pSem = pCtx->pSem;
tsem_wait(pSem); tsem_wait(pSem);
......
...@@ -257,6 +257,12 @@ int transDestroyBuffer(SConnBuffer* buf) { ...@@ -257,6 +257,12 @@ int transDestroyBuffer(SConnBuffer* buf) {
transClearBuffer(buf); transClearBuffer(buf);
} }
int transSetConnOption(uv_tcp_t* stream) {
uv_tcp_nodelay(stream, 1);
int ret = uv_tcp_keepalive(stream, 5, 5);
return ret;
}
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); SAsyncPool* pool = calloc(1, sizeof(SAsyncPool));
pool->index = 0; pool->index = 0;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "transComm.h" #include "transComm.h"
typedef struct SSrvConn { typedef struct SSrvConn {
T_REF_DECLARE()
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
uv_write_t* pWriter; uv_write_t* pWriter;
uv_timer_t* pTimer; uv_timer_t* pTimer;
...@@ -67,16 +68,19 @@ typedef struct SWorkThrdObj { ...@@ -67,16 +68,19 @@ typedef struct SWorkThrdObj {
} SWorkThrdObj; } SWorkThrdObj;
typedef struct SServerObj { typedef struct SServerObj {
pthread_t thread; pthread_t thread;
uv_tcp_t server; uv_tcp_t server;
uv_loop_t* loop; uv_loop_t* loop;
// work thread info
int workerIdx; int workerIdx;
int numOfThreads; int numOfThreads;
SWorkThrdObj** pThreadObj; SWorkThrdObj** pThreadObj;
uv_pipe_t** pipe;
uint32_t ip; uv_pipe_t** pipe;
uint32_t port; uint32_t ip;
uv_async_t* pAcceptAsync; // just to quit from from accept thread uint32_t port;
uv_async_t* pAcceptAsync; // just to quit from from accept thread
} SServerObj; } SServerObj;
static const char* notify = "a"; static const char* notify = "a";
...@@ -493,13 +497,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -493,13 +497,11 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init(pThrd->loop, pConn->pTcp); uv_tcp_init(pThrd->loop, pConn->pTcp);
pConn->pTcp->data = pConn; pConn->pTcp->data = pConn;
// uv_tcp_nodelay(pConn->pTcp, 1);
// uv_tcp_keepalive(pConn->pTcp, 1, 1);
// init write request, just
pConn->pWriter = calloc(1, sizeof(uv_write_t)); pConn->pWriter = calloc(1, sizeof(uv_write_t));
pConn->pWriter->data = pConn; pConn->pWriter->data = pConn;
transSetConnOption((uv_tcp_t*)pConn->pTcp);
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
......
...@@ -124,6 +124,7 @@ int main(int argc, char *argv[]) { ...@@ -124,6 +124,7 @@ int main(int argc, char *argv[]) {
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 1;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcDebugFlag = 143;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
......
...@@ -125,6 +125,8 @@ int main(int argc, char *argv[]) { ...@@ -125,6 +125,8 @@ int main(int argc, char *argv[]) {
rpcInit.idleTime = 2 * 1500; rpcInit.idleTime = 2 * 1500;
rpcInit.afp = retrieveAuthInfo; rpcInit.afp = retrieveAuthInfo;
rpcDebugFlag = 143;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-p") == 0 && i < argc - 1) { if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
rpcInit.localPort = atoi(argv[++i]); rpcInit.localPort = atoi(argv[++i]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册