提交 60b48e3e 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 fdb79077
......@@ -21,31 +21,6 @@
extern "C" {
#endif
#ifdef USE_UV
typedef struct {
char version : 4; // RPC version
char comp : 4; // compression algorithm, 0:no compression 1:lz4
char resflag : 2; // reserved bits
char spi : 3; // security parameter index
char encrypt : 3; // encrypt algorithm, 0: no encryption
uint16_t tranId; // transcation ID
uint32_t linkUid; // for unique connection ID assigned by client
uint64_t ahandle; // ahandle assigned by client
uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN]; // user ID
uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved
uint16_t msgType; // message type
int32_t msgLen; // message length including the header iteslf
uint32_t msgVer;
int32_t code; // code in response message
uint8_t content[0]; // message body starts from here
} SRpcHead;
#else
#define RPC_CONN_TCP 2
extern int tsRpcOverhead;
......@@ -96,7 +71,6 @@ typedef struct {
} SRpcDigest;
#pragma pack(pop)
#endif
#ifdef __cplusplus
}
......
......@@ -21,8 +21,6 @@
extern "C" {
#endif
#ifdef USE_UV
#else
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle);
void taosStopTcpServer(void *param);
void taosCleanUpTcpServer(void *param);
......@@ -35,8 +33,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
void taosCloseTcpConnection(void *chandle);
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle);
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -16,6 +16,7 @@
#ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_
#include "rpcHead.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -22,9 +22,6 @@
#include "ttimer.h"
#include "tutil.h"
#ifdef USE_UV
#else
typedef struct SConnHash {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
......@@ -295,4 +292,3 @@ static void rpcUnlockCache(int64_t *lockedBy) {
assert(false);
}
}
#endif
......@@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include <uv.h>
#endif
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
......@@ -36,6 +33,17 @@
#include "ttimer.h"
#include "tutil.h"
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;
int tsRpcMaxUdpSize = 15000; // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;
#ifndef USE_UV
typedef struct {
int sessions; // number of sessions allowed
int numOfThreads; // number of threads to process incoming messages
......@@ -51,28 +59,28 @@ typedef struct {
char secret[TSDB_PASSWORD_LEN]; // secret for the link
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount;
void* parent;
void* idPool; // handle to ID pool
void* tmrCtrl; // handle to timer
SHashObj* hash; // handle returned by hash utility
void* tcphandle; // returned handle from TCP initialization
void* udphandle; // returned handle from UDP initialization
void* pCache; // connection cache
void * parent;
void * idPool; // handle to ID pool
void * tmrCtrl; // handle to timer
SHashObj * hash; // handle returned by hash utility
void * tcphandle; // returned handle from TCP initialization
void * udphandle; // returned handle from UDP initialization
void * pCache; // connection cache
pthread_mutex_t mutex;
struct SRpcConn* connList; // connection list
struct SRpcConn *connList; // connection list
} SRpcInfo;
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SRpcInfo * pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
struct SRpcConn* pConn; // pConn allocated
void * ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
uint8_t * pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
int16_t numOfTry; // number of try for different servers
......@@ -80,394 +88,14 @@ typedef struct {
int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
SEpSet* pSet; // for synchronous API
SRpcMsg * pRsp; // for synchronous API
tsem_t * pSem; // for synchronous API
SEpSet * pSet; // for synchronous API
char msg[0]; // RpcHead starts from here
} SRpcReqContext;
#ifdef USE_UV
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
static const char* notify = "a";
typedef struct SThreadObj {
pthread_t thread;
uv_pipe_t* pipe;
uv_loop_t* loop;
uv_async_t* workerAsync; //
int fd;
queue conn;
pthread_mutex_t connMtx;
} SThreadObj;
typedef struct SServerObj {
pthread_t thread;
uv_tcp_t server;
uv_loop_t* loop;
int workerIdx;
int numOfThread;
SThreadObj** pThreadObj;
uv_pipe_t** pipe;
uint32_t ip;
uint32_t port;
} SServerObj;
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
typedef struct SConnCtx {
uv_tcp_t* pTcp;
uv_write_t* pWriter;
uv_timer_t* pTimer;
uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
SConnBuffer connBuf;
int count;
} SConnCtx;
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
static SConnCtx* connCtxCreate();
static void connCtxDestroy(SConnCtx* ctx);
static void uvConnCtxDestroy(uv_handle_t* handle);
static void* workerThread(void* arg);
static void* acceptThread(void* arg);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
int32_t rpcInit() { return -1; }
void rpcCleanup() { return; };
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
// opte
}
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
srv->numOfThread = numOfThreads;
srv->workerIdx = 0;
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
srv->ip = ip;
srv->port = port;
uv_loop_init(srv->loop);
for (int i = 0; i < srv->numOfThread; i++) {
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
return NULL;
}
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
if (err == 0) {
tDebug("sucess to create worker-thread %d", i);
// printf("thread %d create\n", i);
} else {
// TODO: clear all other resource later
tError("failed to create worker-thread %d", i);
}
srv->pThreadObj[i] = thrd;
}
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
if (err == 0) {
tDebug("success to create accept-thread");
} else {
// clear all resource later
}
return srv;
}
void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) {
return NULL;
}
if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
}
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc;
}
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
static const int CAPACITY = 1024;
/*
* formate of data buffer:
* |<-------SRpcReqContext------->|<------------data read from socket----------->|
*/
SConnCtx* ctx = handle->data;
SConnBuffer* pBuf = &ctx->connBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf + RPC_RESERVE_SIZE;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
}
}
buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
buf->len = pBuf->cap - pBuf->len;
}
}
// check data read from socket completely or not
//
static bool isReadAll(SConnBuffer* data) {
// TODO(yihao): handle pipeline later
SRpcHead rpcHead;
int32_t headLen = sizeof(rpcHead);
if (data->len >= headLen) {
memcpy((char*)&rpcHead, data->buf, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
return true;
}
} else {
return false;
}
}
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SConnCtx* ctx = cli->data;
SConnBuffer* pBuf = &ctx->connBuf;
if (nread > 0) {
pBuf->len += nread;
if (isReadAll(pBuf)) {
tDebug("alread read complete packet");
} else {
tDebug("read half packet, continue to read");
}
return;
}
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
}
uv_close((uv_handle_t*)cli, uvConnCtxDestroy);
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(sizeof(char));
buf->len = 2;
}
void uvOnTimeoutCb(uv_timer_t* handle) {
// opt
tDebug("time out");
}
void uvOnWriteCb(uv_write_t* req, int status) {
SConnCtx* ctx = req->data;
if (status == 0) {
tDebug("data already was written on stream");
} else {
connCtxDestroy(ctx);
}
// opt
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
SConnCtx* conn = NULL;
// opt later
pthread_mutex_lock(&pObj->connMtx);
if (!QUEUE_IS_EMPTY(&pObj->conn)) {
queue* head = QUEUE_HEAD(&pObj->conn);
conn = QUEUE_DATA(head, SConnCtx, queue);
QUEUE_REMOVE(&conn->queue);
}
pthread_mutex_unlock(&pObj->connMtx);
if (conn == NULL) {
tError("except occurred, do nothing");
return;
}
}
void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) {
return;
}
SServerObj* pObj = container_of(stream, SServerObj, server);
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
} else {
uv_close((uv_handle_t*)cli, NULL);
}
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tDebug("connection coming");
if (nread < 0) {
if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread));
}
// TODO(log other failure reason)
uv_close((uv_handle_t*)q, NULL);
return;
}
// free memory allocated by
assert(nread == strlen(notify));
assert(buf->base[0] == notify[0]);
free(buf->base);
SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe);
uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) {
tError("No pending count");
return;
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SConnCtx* pConn = connCtxCreate();
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
// init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
// init write request, just
pConn->pWriter = calloc(1, sizeof(uv_write_t));
pConn->pWriter->data = pConn;
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else {
connCtxDestroy(pConn);
}
}
void* acceptThread(void* arg) {
// opt
SServerObj* srv = (SServerObj*)arg;
uv_tcp_init(srv->loop, &srv->server);
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
int err = 0;
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
tError("Listen error %s\n", uv_err_name(err));
return NULL;
}
uv_run(srv->loop, UV_RUN_DEFAULT);
}
void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, pObj->fd);
QUEUE_INIT(&pObj->conn);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pObj->loop, UV_RUN_DEFAULT);
}
static SConnCtx* connCtxCreate() {
SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx));
return pConn;
}
static void connCtxDestroy(SConnCtx* ctx) {
if (ctx == NULL) {
return;
}
uv_timer_stop(ctx->pTimer);
free(ctx->pTimer);
uv_close((uv_handle_t*)ctx->pTcp, NULL);
free(ctx->pTcp);
free(ctx->pWriter);
free(ctx);
// handle
}
static void uvConnCtxDestroy(uv_handle_t* handle) {
SConnCtx* ctx = handle->data;
connCtxDestroy(ctx);
}
void rpcClose(void* arg) { return; }
void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
#else
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
......@@ -510,15 +138,6 @@ typedef struct SRpcConn {
SRpcReqContext *pContext; // request context
} SRpcConn;
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;
int tsRpcMaxUdpSize = 15000; // bytes
int tsProgressTimer = 100;
// not configurable
int tsRpcMaxRetry;
int tsRpcHeadSize;
int tsRpcOverhead;
static int tsRpcRefId = -1;
static int32_t tsRpcNum = 0;
// static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
......
......@@ -14,9 +14,6 @@
*/
#include "rpcTcp.h"
#ifdef USE_UV
#include <uv.h>
#endif
#include "os.h"
#include "rpcHead.h"
#include "rpcLog.h"
......@@ -24,9 +21,6 @@
#include "taoserror.h"
#include "tutil.h"
#ifdef USE_UV
#else
typedef struct SFdObj {
void * signature;
SOCKET fd; // TCP socket FD
......@@ -662,5 +656,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj);
}
#endif
......@@ -22,9 +22,6 @@
#include "ttimer.h"
#include "tutil.h"
#ifdef USE_UV
// no support upd currently
#else
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds
......@@ -260,4 +257,3 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return ret;
}
#endif
......@@ -12,3 +12,700 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef USE_UV
#include <uv.h>
#include "lz4.h"
#include "os.h"
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
#include "tidpool.h"
#include "tmd5.h"
#include "tmempool.h"
#include "tmsg.h"
#include "transportInt.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "tutil.h"
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
static const char* notify = "a";
typedef struct {
int sessions; // number of sessions allowed
int numOfThreads; // number of threads to process incoming messages
int idleTime; // milliseconds;
uint16_t localPort;
int8_t connType;
int index; // for UDP server only, round robin for multiple threads
char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char secret[TSDB_PASSWORD_LEN]; // secret for the link
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
int32_t refCount;
void* parent;
void* idPool; // handle to ID pool
void* tmrCtrl; // handle to timer
SHashObj* hash; // handle returned by hash utility
void* tcphandle; // returned handle from TCP initialization
void* udphandle; // returned handle from UDP initialization
void* pCache; // connection cache
pthread_mutex_t mutex;
struct SRpcConn* connList; // connection list
} SRpcInfo;
typedef struct {
SRpcInfo* pRpc; // associated SRpcInfo
SEpSet epSet; // ip list provided by app
void* ahandle; // handle provided by app
struct SRpcConn* pConn; // pConn allocated
tmsg_t msgType; // message type
uint8_t* pCont; // content provided by app
int32_t contLen; // content length
int32_t code; // error code
int16_t numOfTry; // number of try for different servers
int8_t oldInUse; // server EP inUse passed by app
int8_t redirect; // flag to indicate redirect
int8_t connType; // connection type
int64_t rid; // refId returned by taosAddRef
SRpcMsg* pRsp; // for synchronous API
tsem_t* pSem; // for synchronous API
SEpSet* pSet; // for synchronous API
char msg[0]; // RpcHead starts from here
} SRpcReqContext;
typedef struct SThreadObj {
pthread_t thread;
uv_pipe_t* pipe;
int fd;
uv_loop_t* loop;
uv_async_t* workerAsync; //
queue conn;
pthread_mutex_t connMtx;
void* shandle;
} SThreadObj;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
#define rpcIsReq(type) (type & 1U)
typedef struct SServerObj {
pthread_t thread;
uv_tcp_t server;
uv_loop_t* loop;
int workerIdx;
int numOfThread;
SThreadObj** pThreadObj;
uv_pipe_t** pipe;
uint32_t ip;
uint32_t port;
} SServerObj;
typedef struct SConnBuffer {
char* buf;
int len;
int cap;
int left;
} SConnBuffer;
typedef struct SRpcConn {
uv_tcp_t* pTcp;
uv_write_t* pWriter;
uv_timer_t* pTimer;
uv_async_t* pWorkerAsync;
queue queue;
int ref;
int persist; // persist connection or not
SConnBuffer connBuf;
int count;
void* shandle; // rpc init
void* ahandle;
// del later
char secured;
int spi;
char info[64];
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
} SRpcConn;
// auth function
static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen);
// compress data
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void uvOnTimeoutCb(uv_timer_t* handle);
static void uvOnWriteCb(uv_write_t* req, int status);
static void uvOnAcceptCb(uv_stream_t* stream, int status);
static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
static void uvWorkerAsyncCb(uv_async_t* handle);
static SRpcConn* connCreate();
static void connDestroy(SRpcConn* conn);
static void uvConnDestroy(uv_handle_t* handle);
static void* workerThread(void* arg);
static void* acceptThread(void* arg);
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
srv->numOfThread = numOfThreads;
srv->workerIdx = 0;
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*));
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*));
srv->ip = ip;
srv->port = port;
uv_loop_init(srv->loop);
for (int i = 0; i < srv->numOfThread; i++) {
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
return NULL;
}
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
thrd->shandle = shandle;
thrd->fd = fds[0];
thrd->pipe = &(srv->pipe[i][1]); // init read
int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
if (err == 0) {
tDebug("sucess to create worker-thread %d", i);
// printf("thread %d create\n", i);
} else {
// TODO: clear all other resource later
tError("failed to create worker-thread %d", i);
}
srv->pThreadObj[i] = thrd;
}
int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
if (err == 0) {
tDebug("success to create accept-thread");
} else {
// clear all resource later
}
return srv;
}
void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
/*
* formate of data buffer:
* |<-------SRpcReqContext------->|<------------data read from socket----------->|
*/
static const int CAPACITY = 1024;
SRpcConn* ctx = handle->data;
SConnBuffer* pBuf = &ctx->connBuf;
if (pBuf->cap == 0) {
pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
pBuf->len = 0;
pBuf->cap = CAPACITY;
pBuf->left = -1;
buf->base = pBuf->buf + RPC_RESERVE_SIZE;
buf->len = CAPACITY;
} else {
if (pBuf->len >= pBuf->cap) {
if (pBuf->left == -1) {
pBuf->cap *= 2;
pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
} else if (pBuf->len + pBuf->left > pBuf->cap) {
pBuf->cap = pBuf->len + pBuf->left;
pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
}
}
buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
buf->len = pBuf->cap - pBuf->len;
}
}
// check data read from socket completely or not
//
static bool isReadAll(SConnBuffer* data) {
// TODO(yihao): handle pipeline later
SRpcHead rpcHead;
int32_t headLen = sizeof(rpcHead);
if (data->len >= headLen) {
memcpy((char*)&rpcHead, data->buf, headLen);
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
if (msgLen > data->len) {
data->left = msgLen - data->len;
return false;
} else {
return true;
}
} else {
return false;
}
}
static void uvDoProcess(SRecvInfo* pRecv) {
SRpcHead* pHead = (SRpcHead*)pRecv->msg;
SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
SRpcConn* pConn = pRecv->thandle;
tDump(pRecv->msg, pRecv->msgLen);
terrno = 0;
SRpcReqContext* pContest;
// do auth and check
}
static int uvAuthData(SRpcConn* pConn, char* msg, int len) {
SRpcHead* pHead = (SRpcHead*)msg;
int code = 0;
if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
// secured link, or no authentication
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, secured link, no auth is required", pConn->info);
return 0;
}
if (!rpcIsReq(pHead->msgType)) {
// for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
return 0;
}
}
code = 0;
if (pHead->spi == pConn->spi) {
// authentication
SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));
int32_t delta;
delta = (int32_t)htonl(pDigest->timeStamp);
delta -= (int32_t)taosGetTimestampSec();
if (abs(delta) > 900) {
tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
} else {
if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
// tDebug("%s, authentication failed, msg discarded", pConn->info);
code = TSDB_CODE_RPC_AUTH_FAILURE;
} else {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client
// tTrace("%s, message is authenticated", pConn->info);
}
}
} else {
tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
}
return code;
}
static void uvProcessData(SRpcConn* ctx) {
SRecvInfo info;
SRecvInfo* p = &info;
SConnBuffer* pBuf = &ctx->connBuf;
p->msg = pBuf->buf + RPC_RESERVE_SIZE;
p->msgLen = pBuf->len;
p->ip = 0;
p->port = 0;
p->shandle = ctx->shandle; //
p->thandle = ctx;
p->chandle = NULL;
//
SRpcHead* pHead = (SRpcHead*)p->msg;
assert(rpcIsReq(pHead->msgType));
SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
SRpcConn* pConn = (SRpcConn*)p->thandle;
pConn->ahandle = (void*)pHead->ahandle;
pHead->code = htonl(pHead->code);
SRpcMsg rpcMsg;
pHead = rpcDecompressRpcMsg(pHead);
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code;
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn;
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
// auth
// validate msg type
}
void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SRpcConn* ctx = cli->data;
SConnBuffer* pBuf = &ctx->connBuf;
if (nread > 0) {
pBuf->len += nread;
if (isReadAll(pBuf)) {
tDebug("alread read complete packet");
uvProcessData(ctx);
} else {
tDebug("read half packet, continue to read");
}
return;
}
if (nread != UV_EOF) {
tDebug("Read error %s\n", uv_err_name(nread));
}
uv_close((uv_handle_t*)cli, uvConnDestroy);
}
void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
buf->base = malloc(sizeof(char));
buf->len = 2;
}
void uvOnTimeoutCb(uv_timer_t* handle) {
// opt
tDebug("time out");
}
void uvOnWriteCb(uv_write_t* req, int status) {
SRpcConn* conn = req->data;
if (status == 0) {
tDebug("data already was written on stream");
} else {
connDestroy(conn);
}
// opt
}
void uvWorkerAsyncCb(uv_async_t* handle) {
SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync);
SRpcConn* conn = NULL;
// opt later
pthread_mutex_lock(&pObj->connMtx);
if (!QUEUE_IS_EMPTY(&pObj->conn)) {
queue* head = QUEUE_HEAD(&pObj->conn);
conn = QUEUE_DATA(head, SRpcConn, queue);
QUEUE_REMOVE(&conn->queue);
}
pthread_mutex_unlock(&pObj->connMtx);
if (conn == NULL) {
tError("except occurred, do nothing");
return;
}
}
void uvOnAcceptCb(uv_stream_t* stream, int status) {
if (status == -1) {
return;
}
SServerObj* pObj = container_of(stream, SServerObj, server);
uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
} else {
uv_close((uv_handle_t*)cli, NULL);
}
}
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tDebug("connection coming");
if (nread < 0) {
if (nread != UV_EOF) {
tError("read error %s", uv_err_name(nread));
}
// TODO(log other failure reason)
uv_close((uv_handle_t*)q, NULL);
return;
}
// free memory allocated by
assert(nread == strlen(notify));
assert(buf->base[0] == notify[0]);
free(buf->base);
SThreadObj* pObj = q->data;
uv_pipe_t* pipe = (uv_pipe_t*)q;
if (!uv_pipe_pending_count(pipe)) {
tError("No pending count");
return;
}
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SRpcConn* pConn = connCreate();
pConn->shandle = pObj->shandle;
/* init conn timer*/
pConn->pTimer = malloc(sizeof(uv_timer_t));
uv_timer_init(pObj->loop, pConn->pTimer);
pConn->pWorkerAsync = pObj->workerAsync; // thread safty
// init client handle
pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pObj->loop, pConn->pTcp);
pConn->pTcp->data = pConn;
// init write request, just
pConn->pWriter = calloc(1, sizeof(uv_write_t));
pConn->pWriter->data = pConn;
if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
uv_os_fd_t fd;
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
tDebug("new connection created: %d", fd);
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
} else {
connDestroy(pConn);
}
}
void* acceptThread(void* arg) {
// opt
SServerObj* srv = (SServerObj*)arg;
uv_tcp_init(srv->loop, &srv->server);
struct sockaddr_in bind_addr;
uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
int err = 0;
if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
tError("Listen error %s\n", uv_err_name(err));
return NULL;
}
uv_run(srv->loop, UV_RUN_DEFAULT);
}
void* workerThread(void* arg) {
SThreadObj* pObj = (SThreadObj*)arg;
pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
uv_loop_init(pObj->loop);
uv_pipe_init(pObj->loop, pObj->pipe, 1);
uv_pipe_open(pObj->pipe, pObj->fd);
pObj->pipe->data = pObj;
QUEUE_INIT(&pObj->conn);
pObj->workerAsync = malloc(sizeof(uv_async_t));
uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb);
uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
uv_run(pObj->loop, UV_RUN_DEFAULT);
}
static SRpcConn* connCreate() {
SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn));
return pConn;
}
static void connDestroy(SRpcConn* conn) {
if (conn == NULL) {
return;
}
uv_timer_stop(conn->pTimer);
free(conn->pTimer);
uv_close((uv_handle_t*)conn->pTcp, NULL);
free(conn->pTcp);
free(conn->pWriter);
free(conn);
// handle
}
static void uvConnDestroy(uv_handle_t* handle) {
SRpcConn* conn = handle->data;
connDestroy(conn);
}
void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) {
return NULL;
}
if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
}
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc;
}
void rpcClose(void* arg) { return; }
void* rpcMallocCont(int contLen) { return NULL; }
void rpcFreeCont(void* cont) { return; }
void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; }
void rpcSendResponse(const SRpcMsg* pMsg) {}
void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
return ret;
}
static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t*)pMsg, msgLen);
tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));
}
static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) {
SRpcHead* pHead = (SRpcHead*)msg;
if (pConn->spi && pConn->secured == 0) {
// add auth part
pHead->spi = pConn->spi;
SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen);
pDigest->timeStamp = htonl(taosGetTimestampSec());
msgLen += sizeof(SRpcDigest);
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
} else {
pHead->spi = 0;
pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
}
return msgLen;
}
static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
SRpcHead* pHead = rpcHeadFromCont(pCont);
int32_t finalLen = 0;
int overhead = sizeof(SRpcComp);
if (!NEEDTO_COMPRESSS_MSG(contLen)) {
return contLen;
}
char* buf = malloc(contLen + overhead + 8); // 8 extra bytes
if (buf == NULL) {
tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
return contLen;
}
int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
/*
* only the compressed size is less than the value of contLen - overhead, the compression is applied
* The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
*/
if (compLen > 0 && compLen < contLen - overhead) {
SRpcComp* pComp = (SRpcComp*)pCont;
pComp->reserved = 0;
pComp->contLen = htonl(contLen);
memcpy(pCont + overhead, buf, compLen);
pHead->comp = 1;
tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
finalLen = compLen + overhead;
} else {
finalLen = contLen;
}
free(buf);
return finalLen;
}
static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
int overhead = sizeof(SRpcComp);
SRpcHead* pNewHead = NULL;
uint8_t* pCont = pHead->content;
SRpcComp* pComp = (SRpcComp*)pHead->content;
if (pHead->comp) {
// decompress the content
assert(pComp->reserved == 0);
int contLen = htonl(pComp->contLen);
// prepare the temporary buffer to decompress message
char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD);
pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
if (pNewHead) {
int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen);
assert(origLen == contLen);
memcpy(pNewHead, pHead, sizeof(SRpcHead));
pNewHead->msgLen = rpcMsgLenFromCont(origLen);
/// rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead;
tTrace("decomp malloc mem:%p", temp);
} else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
}
}
return pHead;
}
int32_t rpcInit(void) {
// impl later
return -1;
}
void rpcCleanup(void) {
// impl later
return;
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册