提交 95e353a1 编写于 作者: dengyihao's avatar dengyihao

add client

上级 81f60703
...@@ -101,6 +101,13 @@ typedef struct SThreadObj { ...@@ -101,6 +101,13 @@ typedef struct SThreadObj {
void* shandle; void* shandle;
} SThreadObj; } SThreadObj;
typedef struct SClientObj {
char label[TSDB_LABEL_LEN];
int32_t index;
int numOfThreads;
SThreadObj** pThreadObj;
} SClientObj;
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
...@@ -113,7 +120,7 @@ typedef struct SServerObj { ...@@ -113,7 +120,7 @@ typedef struct SServerObj {
uv_tcp_t server; uv_tcp_t server;
uv_loop_t* loop; uv_loop_t* loop;
int workerIdx; int workerIdx;
int numOfThread; int numOfThreads;
SThreadObj** pThreadObj; SThreadObj** pThreadObj;
uv_pipe_t** pipe; uv_pipe_t** pipe;
uint32_t ip; uint32_t ip;
...@@ -179,18 +186,37 @@ static void* acceptThread(void* arg); ...@@ -179,18 +186,37 @@ static void* acceptThread(void* arg);
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {taosInitServer, taosInitClient};
void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SClientObj* cli = calloc(1, sizeof(SClientObj));
memcpy(cli->label, label, strlen(label));
cli->numOfThreads = numOfThreads;
cli->pThreadObj = (SThreadObj**)calloc(cli->numOfThreads, sizeof(SThreadObj*));
for (int i = 0; i < cli->numOfThreads; i++) {
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
int err = pthread_create(&thrd->thread, NULL, workerThread, (void*)(thrd));
if (err == 0) {
tDebug("sucess to create tranport-client thread %d", i);
}
}
return cli;
}
void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
SServerObj* srv = calloc(1, sizeof(SServerObj)); SServerObj* srv = calloc(1, sizeof(SServerObj));
srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
srv->numOfThread = numOfThreads; srv->numOfThreads = numOfThreads;
srv->workerIdx = 0; srv->workerIdx = 0;
srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThreads, sizeof(SThreadObj*));
srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
srv->ip = ip; srv->ip = ip;
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
for (int i = 0; i < srv->numOfThread; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj));
srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
int fds[2]; int fds[2];
...@@ -299,8 +325,7 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { ...@@ -299,8 +325,7 @@ static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) {
if (!rpcIsReq(pHead->msgType)) { if (!rpcIsReq(pHead->msgType)) {
// for response, if code is auth failure, it shall bypass the auth process // for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code); code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
...@@ -460,7 +485,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -460,7 +485,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
} else { } else {
...@@ -587,7 +612,9 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -587,7 +612,9 @@ void* rpcOpen(const SRpcInit* pInit) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
} }
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); pRpc->connType = pInit->connType;
pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
// pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
return pRpc; return pRpc;
} }
void rpcClose(void* arg) { return; } void rpcClose(void* arg) { return; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册