diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f2d844f73d1bb6c253ed60b045341c4a33962d75..00bc1b621fb05445629f0ee21324629e9ee70f98 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,6 +21,7 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t* writeReq; + void* hostThrd; SConnBuffer readBuf; void* data; queue conn; @@ -45,7 +46,7 @@ typedef struct SCliThrdObj { queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout - void* shandle; // + void* pTransInst; // } SCliThrdObj; @@ -69,7 +70,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); -// process data read from server, auth/decompress etc +// process data read from server, auth/decompress etc later static void clientProcessData(SCliConn* conn); // check whether already read complete packet from server static bool clientReadComplete(SConnBuffer* pBuf); @@ -91,20 +92,25 @@ static void* clientThread(void* arg); static void clientProcessData(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; - SRpcInfo* pRpc = pCtx->ahandle; + SRpcInfo* pRpc = pCtx->pRpc; SRpcMsg rpcMsg; rpcMsg.pCont = conn->readBuf.buf; rpcMsg.contLen = conn->readBuf.len; rpcMsg.ahandle = pCtx->ahandle; (pRpc->cfp)(NULL, &rpcMsg, NULL); + + SCliThrdObj* pThrd = conn->hostThrd; + addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + free(pCtx->ip); + free(pCtx); // impl } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; - SRpcInfo* pRpc = pThrd->shandle; + SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); @@ -127,7 +133,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { } static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - return false; + return cache; } static void* connCacheDestroy(void* cache) { SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); @@ -153,8 +159,9 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { if (plist == NULL) { SConnList list; plist = &list; - QUEUE_INIT(&plist->conn); taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + plist = taosHashGet(pCache, key, strlen(key)); + QUEUE_INIT(&plist->conn); } if (QUEUE_IS_EMPTY(&plist->conn)) { @@ -169,8 +176,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx; - SRpcInfo* pRpc = ctx->pRpc; + SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before @@ -200,10 +206,11 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(CAPACITY, sizeof(char)); + pBuf->buf = (char*)calloc(1, CAPACITY * sizeof(char)); pBuf->len = 0; pBuf->cap = CAPACITY; pBuf->left = -1; + buf->base = pBuf->buf; buf->len = CAPACITY; } else { @@ -213,7 +220,7 @@ static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, pBuf->buf = realloc(pBuf->buf, pBuf->cap); } else if (pBuf->len + pBuf->left > pBuf->cap) { pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left); + pBuf->buf = realloc(pBuf->buf, pBuf->cap); } } buf->base = pBuf->buf + pBuf->len; @@ -227,7 +234,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf if (nread > 0) { pBuf->len += nread; if (clientReadComplete(pBuf)) { - tDebug("alread read complete pack"); + tDebug("alread read complete"); clientProcessData(conn); } else { tDebug("read halp packet, continue to read"); @@ -260,7 +267,12 @@ static void clientWriteCb(uv_write_t* req, int status) { uv_close((uv_handle_t*)pConn->stream, clientDestroy); return; } - + SCliThrdObj* pThrd = pConn->hostThrd; + if (pConn->stream == NULL) { + pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); + pConn->stream->data = pConn; + } uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); // impl later } @@ -270,35 +282,35 @@ static void clientWrite(SCliConn* pConn) { SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - int msgLen = transMsgLenFromCont(pMsg->contLen); - char* msg = (char*)(pHead); + int msgLen = transMsgLenFromCont(pMsg->contLen); + + pHead->msgType = pMsg->msgType; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - uv_buf_t wb = uv_buf_init(msg, msgLen); + uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); + tDebug("data write out, msgType : %d, len: %d", pHead->msgType, msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; - if (status != 0) { - tError("failed to connect %s", uv_err_name(status)); - clientConnDestroy(pConn); - return; - } + SCliMsg* pMsg = pConn->data; - SCliMsg* pMsg = pConn->data; - STransConnCtx* pCtx = ((SCliMsg*)(pConn->data))->ctx; - - SRpcMsg rpcMsg; - rpcMsg.ahandle = pCtx->ahandle; + STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pRpc = pCtx->pRpc; if (status != 0) { + // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); + tError("failed to connect server, errmsg: %s", uv_strerror(status)); // call user fp later - tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - SRpcInfo* pRpc = pMsg->ctx->pRpc; + SRpcMsg rpcMsg; + rpcMsg.ahandle = pCtx->ahandle; + // SRpcInfo* pRpc = pMsg->ctx->pRpc; (pRpc->cfp)(NULL, &rpcMsg, NULL); uv_close((uv_handle_t*)req->handle, clientDestroy); return; } + assert(pConn->stream == req->handle); clientWrite(pConn); } @@ -315,17 +327,27 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { // impl later conn->data = pMsg; conn->writeReq->data = conn; + + conn->readBuf.len = 0; + memset(conn->readBuf.buf, 0, conn->readBuf.cap); + conn->readBuf.left = -1; clientWrite(conn); } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); + // read/write stream handle conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + conn->stream->data = conn; + + // write req handle conn->writeReq = malloc(sizeof(uv_write_t)); + conn->writeReq->data = conn; QUEUE_INIT(&conn->conn); conn->connReq.data = conn; conn->data = pMsg; + conn->hostThrd = pThrd; struct sockaddr_in addr; uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr); @@ -359,23 +381,24 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - SRpcInfo* pRpc = pThrd->shandle; - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0); + uv_run(pThrd->loop, UV_RUN_DEFAULT); } void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SClientObj* cli = calloc(1, sizeof(SClientObj)); + SRpcInfo* pRpc = shandle; memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); for (int i = 0; i < cli->numOfThreads; i++) { SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); @@ -385,8 +408,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->pTimer->data = pThrd; + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - pThrd->shandle = shandle; + pThrd->cache = connCacheCreate(1); + pThrd->pTransInst = shandle; int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 45425410430d74ec7754b413af95d5b03d858788..77b5f635f4e4b31cd8d9c8681a358d4cdfb0fa8c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -24,13 +24,15 @@ typedef struct SConn { uv_async_t* pWorkerAsync; queue queue; int ref; - int persist; // persist connection or not - SConnBuffer connBuf; // read buf, - SConnBuffer writeBuf; // write buf + int persist; // persist connection or not + SConnBuffer connBuf; // read buf, int count; - void* shandle; // rpc init - void* ahandle; // + int inType; + void* pTransInst; // rpc init + void* ahandle; // void* hostThrd; + + SRpcMsg sendMsg; // del later char secured; int spi; @@ -48,7 +50,7 @@ typedef struct SWorkThrdObj { uv_async_t* workerAsync; // queue conn; pthread_mutex_t connMtx; - void* shandle; + void* pTransInst; } SWorkThrdObj; typedef struct SServerObj { @@ -66,7 +68,7 @@ typedef struct SServerObj { static const char* notify = "a"; // refactor later -static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen); +static int transAddAuthPart(SConn* pConn, char* msg, int msgLen); static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); @@ -75,10 +77,13 @@ static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); static void uvWorkerAsyncCb(uv_async_t* handle); +static void uvPrepareSendData(SConn* conn, uv_buf_t* wb); + // already read complete packet static bool readComplete(SConnBuffer* buf); @@ -135,25 +140,28 @@ static bool readComplete(SConnBuffer* data) { if (msgLen > data->len) { data->left = msgLen - data->len; return false; - } else { + } else if (msgLen == data->len) { return true; + } else if (msgLen < data->len) { + return false; + // handle other packet later } } else { return false; } } -static void uvDoProcess(SRecvInfo* pRecv) { - // impl later - STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; - SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; - SConn* pConn = pRecv->thandle; - tDump(pRecv->msg, pRecv->msgLen); - terrno = 0; - // SRpcReqContext* pContest; - - // do auth and check -} +// static void uvDoProcess(SRecvInfo* pRecv) { +// // impl later +// STransMsgHead* pHead = (STransMsgHead*)pRecv->msg; +// SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; +// SConn* pConn = pRecv->thandle; +// tDump(pRecv->msg, pRecv->msgLen); +// terrno = 0; +// // SRpcReqContext* pContest; +// +// // do auth and check +//} static int uvAuthMsg(SConn* pConn, char* msg, int len) { STransMsgHead* pHead = (STransMsgHead*)msg; @@ -222,12 +230,13 @@ static void uvProcessData(SConn* pConn) { p->msgLen = pBuf->len; p->ip = 0; p->port = 0; - p->shandle = pConn->shandle; // + p->shandle = pConn->pTransInst; // p->thandle = pConn; p->chandle = NULL; - // STransMsgHead* pHead = (STransMsgHead*)p->msg; + + pConn->inType = pHead->msgType; assert(transIsReq(pHead->msgType)); SRpcInfo* pRpc = (SRpcInfo*)p->shandle; @@ -247,7 +256,9 @@ static void uvProcessData(SConn* pConn) { // add compress later // pHead = rpcDecompressRpcMsg(pHead); } else { + pHead->msgLen = htonl(pHead->msgLen); // impl later + // } rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; @@ -257,7 +268,7 @@ static void uvProcessData(SConn* pConn) { rpcMsg.handle = pConn; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); + uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type } @@ -277,8 +288,9 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } if (nread != UV_EOF) { - tDebug("Read error %s\n", uv_err_name(nread)); + tDebug("Read error %s", uv_err_name(nread)); } + tDebug("read error %s", uv_err_name(nread)); uv_close((uv_handle_t*)cli, uvConnDestroy); } void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -293,16 +305,48 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnWriteCb(uv_write_t* req, int status) { SConn* conn = req->data; + + SConnBuffer* buf = &conn->connBuf; + buf->len = 0; + memset(buf->buf, 0, buf->cap); + buf->left = -1; if (status == 0) { tDebug("data already was written on stream"); } else { + tDebug("failed to write data, %s", uv_err_name(status)); connDestroy(conn); } // opt } +static void uvOnPipeWriteCb(uv_write_t* req, int status) { + if (status == 0) { + tDebug("success to dispatch conn to work thread"); + } else { + tError("fail to dispatch conn to work thread"); + } +} +static void uvPrepareSendData(SConn* conn, uv_buf_t* wb) { + // impl later + SRpcMsg* pMsg = &conn->sendMsg; + if (pMsg->pCont == 0) { + pMsg->pCont = (void*)rpcMallocCont(0); + pMsg->contLen = 0; + } + STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + pHead->msgType = conn->inType + 1; + // add more info + char* msg = (char*)pHead; + int32_t len = transMsgLenFromCont(pMsg->contLen); + if (transCompressMsg(msg, len, NULL)) { + // impl later + } + pHead->msgLen = htonl(len); + wb->base = msg; + wb->len = len; +} void uvWorkerAsyncCb(uv_async_t* handle) { - SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); + SWorkThrdObj* pThrd = handle->data; SConn* conn = NULL; queue wq; // batch process to avoid to lock/unlock frequently @@ -318,8 +362,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("except occurred, do nothing"); return; } - uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); - + uv_buf_t wb; + uvPrepareSendData(conn, &wb); uv_timer_stop(conn->pTimer); uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); @@ -341,8 +385,9 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); - uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { uv_close((uv_handle_t*)cli, NULL); } @@ -374,7 +419,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { assert(pending == UV_TCP); SConn* pConn = connCreate(); - pConn->shandle = pThrd->shandle; + pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pConn->pTimer); @@ -398,6 +443,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tDebug("new connection created: %d", fd); uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); } else { + tDebug("failed to create new connection"); connDestroy(pConn); } } @@ -418,14 +464,12 @@ void* acceptThread(void* arg) { } uv_run(srv->loop, UV_RUN_DEFAULT); } -void* workerThread(void* arg) { - SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; - +static void initWorkThrdObj(SWorkThrdObj* pThrd) { pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); // SRpcInfo* pRpc = pThrd->shandle; - uv_pipe_init(pThrd->loop, pThrd->pipe, 0); + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; @@ -435,8 +479,12 @@ void* workerThread(void* arg) { pThrd->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); + pThrd->workerAsync->data = pThrd; uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +} +void* workerThread(void* arg) { + SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -444,34 +492,39 @@ static SConn* connCreate() { SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); return pConn; } +static void connCloseCb(uv_handle_t* handle) { + // impl later + // +} static void connDestroy(SConn* conn) { if (conn == NULL) { return; } uv_timer_stop(conn->pTimer); free(conn->pTimer); - uv_close((uv_handle_t*)conn->pTcp, NULL); - free(conn->connBuf.buf); + // uv_close((uv_handle_t*)conn->pTcp, connCloseCb); free(conn->pTcp); + free(conn->connBuf.buf); free(conn->pWriter); - free(conn); + // free(conn); // handle } static void uvConnDestroy(uv_handle_t* handle) { SConn* conn = handle->data; connDestroy(conn); } -static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) { - SRpcHead* pHead = (SRpcHead*)msg; +static int transAddAuthPart(SConn* pConn, char* msg, int msgLen) { + STransMsgHead* pHead = (STransMsgHead*)msg; if (pConn->spi && pConn->secured == 0) { // add auth part pHead->spi = pConn->spi; - SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); + STransDigestMsg* pDigest = (STransDigestMsg*)(msg + msgLen); pDigest->timeStamp = htonl(taosGetTimestampSec()); msgLen += sizeof(SRpcDigest); pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + // transBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); } else { pHead->spi = 0; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); @@ -502,9 +555,11 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - thrd->shandle = shandle; + thrd->pTransInst = shandle; thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read + + initWorkThrdObj(thrd); int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { tDebug("sucess to create worker-thread %d", i); @@ -547,6 +602,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { SWorkThrdObj* pThrd = pConn->hostThrd; // opt later + pConn->sendMsg = *pMsg; pthread_mutex_lock(&pThrd->connMtx); QUEUE_PUSH(&pThrd->conn, &pConn->queue); pthread_mutex_unlock(&pThrd->connMtx);