diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index c4eeef5df20f23e8d2c557bc7e91a8ce2442032c..61d781210c7eb0239ce447a757a9a25c54a6775d 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -27,4 +27,11 @@ if (${BUILD_WITH_UV}) add_definitions(-DUSE_UV) endif(${BUILD_WITH_UV}) +if (${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) + + + + diff --git a/source/libs/transport/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h index 7317d84af1671c06452b6b3f4525675414e160a5..66821db133241ebf16ffb299c401171278e678eb 100644 --- a/source/libs/transport/inc/rpcHead.h +++ b/source/libs/transport/inc/rpcHead.h @@ -22,6 +22,27 @@ extern "C" { #endif #ifdef USE_UV +typedef struct { + char version : 4; // RPC version + char comp : 4; // compression algorithm, 0:no compression 1:lz4 + char resflag : 2; // reserved bits + char spi : 3; // security parameter index + char encrypt : 3; // encrypt algorithm, 0: no encryption + uint16_t tranId; // transcation ID + uint32_t linkUid; // for unique connection ID assigned by client + uint64_t ahandle; // ahandle assigned by client + uint32_t sourceId; // source ID, an index for connection list + uint32_t destId; // destination ID, an index for connection list + uint32_t destIp; // destination IP address, for NAT scenario + char user[TSDB_UNI_LEN]; // user ID + uint16_t port; // for UDP only, port may be changed + char empty[1]; // reserved + uint16_t msgType; // message type + int32_t msgLen; // message length including the header iteslf + uint32_t msgVer; + int32_t code; // code in response message + uint8_t content[0]; // message body starts from here +} SRpcHead; #else diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 9809f7ee1af24f3cbe426c7e67f62f7f0ebebb42..067b371b8474f69833172793dbdf5a27d5797748 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -22,9 +22,58 @@ extern "C" { #ifdef USE_UV -#else +#include +typedef void *queue[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) +#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) + +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Initialize an empty queue. */ +#define QUEUE_INIT(q) \ + { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } + +/* Return true if the queue has no element. */ +#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) + +/* Insert an element at the back of a queue. */ +#define QUEUE_PUSH(q, e) \ + { \ + QUEUE_NEXT(e) = (q); \ + QUEUE_PREV(e) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(e) = (e); \ + QUEUE_PREV(q) = (e); \ + } + +/* Remove the given element from the queue. Any element can be removed at any * + * time. */ +#define QUEUE_REMOVE(e) \ + { \ + QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ + QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ + } + +/* Return the element at the front of the queue. */ +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +/* Return the element at the back of the queue. */ +#define QUEUE_TAIL(q) (QUEUE_PREV(q)) + +/* Iterate over the element of a queue. * Mutating the queue while iterating + * results in undefined behavior. */ +#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) + +/* Return the structure holding the given element. */ +#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field)))) + +#endif // USE_LIBUV -#endif #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 3095ddb9d254e67692a26874040c7ad3580091d7..a1c0c05fc3dba25935b67eeef71b605bab4d706d 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -30,6 +30,7 @@ #include "tmd5.h" #include "tmempool.h" #include "tmsg.h" +#include "transportInt.h" #include "tref.h" #include "trpc.h" #include "ttimer.h" @@ -69,61 +70,87 @@ typedef struct { #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +static const char* notify = "a"; + typedef struct SThreadObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_loop_t* loop; - uv_async_t* workerAsync; // - int fd; + pthread_t thread; + uv_pipe_t* pipe; + uv_loop_t* loop; + uv_async_t* workerAsync; // + int fd; + queue conn; + pthread_mutex_t connMtx; } SThreadObj; typedef struct SServerObj { + pthread_t thread; uv_tcp_t server; uv_loop_t* loop; int workerIdx; int numOfThread; SThreadObj** pThreadObj; uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; } SServerObj; +typedef struct SContent { + char* buf; + int len; + int cap; + int toRead; +} SContent; + typedef struct SConnCtx { - uv_tcp_t* pClient; + uv_tcp_t* pTcp; + uv_write_t* pWriter; uv_timer_t* pTimer; + uv_async_t* pWorkerAsync; + queue queue; int ref; + int persist; // persist connection or not + SContent pCont; + int count; } SConnCtx; -static void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void onTimeout(uv_timer_t* handle); -static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void onWrite(uv_write_t* req, int status); -static void onAccept(uv_stream_t* stream, int status); -void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); -static void workerAsyncCB(uv_async_t* handle); +static void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void onTimeout(uv_timer_t* handle); +static void onWrite(uv_write_t* req, int status); +static void onAccept(uv_stream_t* stream, int status); +static void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void workerAsyncCB(uv_async_t* handle); + +static SConnCtx* connCtxCreate(); +static void connCtxDestroy(SConnCtx* ctx); +static void uvConnCtxDestroy(uv_handle_t* handle); + static void* workerThread(void* arg); +static void* acceptThread(void* arg); + +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); int32_t rpcInit() { return -1; } void rpcCleanup() { return; }; -void* rpcOpen(const SRpcInit* pInit) { - SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) { - return NULL; - } - if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); - } - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + // opte +} +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThread = pRpc->numOfThreads; + srv->numOfThread = numOfThreads; srv->workerIdx = 0; srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; uv_loop_init(srv->loop); for (int i = 0; i < srv->numOfThread; i++) { - srv->pThreadObj[i] = (SThreadObj*)calloc(1, sizeof(SThreadObj)); + SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { @@ -132,28 +159,38 @@ void* rpcOpen(const SRpcInit* pInit) { uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - srv->pThreadObj[i]->fd = fds[0]; - srv->pThreadObj[i]->pipe = &(srv->pipe[i][1]); // init read - int err = pthread_create(&(srv->pThreadObj[i]->thread), NULL, workerThread, (void*)(srv->pThreadObj[i])); + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { - tError("sucess to create worker thread %d", i); + tDebug("sucess to create worker-thread %d", i); // printf("thread %d create\n", i); } else { - tError("failed to create worker thread %d", i); - return NULL; + // TODO: clear all other resource later + tError("failed to create worker-thread %d", i); } + srv->pThreadObj[i] = thrd; } - uv_tcp_init(srv->loop, &srv->server); - struct sockaddr_in bind_addr; - uv_ip4_addr("0.0.0.0", pInit->localPort, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); - return NULL; + + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept-thread"); + } else { + // clear all resource later } - uv_run(srv->loop, UV_RUN_DEFAULT); + return srv; +} +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { + return NULL; + } + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); return pRpc; } void rpcClose(void* arg) { return; } @@ -171,50 +208,150 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } -void allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(suggested_size); - buf->len = suggested_size; +void allocReadBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + static const int CAPACITY = 1024; + tDebug("pre alloc buffer for read "); + SConnCtx* ctx = handle->data; + SContent* pCont = &ctx->pCont; + if (pCont->cap == 0) { + pCont->buf = (char*)calloc(CAPACITY, sizeof(char)); + pCont->len = 0; + pCont->cap = CAPACITY; + pCont->toRead = -1; + + buf->base = pCont->buf; + buf->len = CAPACITY; + } else { + if (pCont->len >= pCont->cap) { + if (pCont->toRead == -1) { + pCont->cap *= 2; + pCont->buf = realloc(pCont->buf, pCont->cap); + } else if (pCont->len + pCont->toRead > pCont->cap) { + pCont->cap = pCont->len + pCont->toRead; + pCont->buf = realloc(pCont->buf, pCont->len + pCont->toRead); + } + } + buf->base = pCont->buf + pCont->len; + buf->len = pCont->cap - pCont->len; + } + + // if (ctx->pCont.cap == 0) { + // ctx->pCont.buf = (char*)calloc(64, sizeof(char)); + // ctx->pCont.len = 0; + // ctx->pCont.cap = 64; + // // + // buf->base = ctx->pCont.buf; + // buf->len = sz; + //} else { + // if (ctx->pCont.len + sz > ctx->pCont.cap) { + // ctx->pCont.cap *= 2; + // ctx->pCont.buf = realloc(ctx->pCont.buf, ctx->pCont.cap); + // } + // buf->base = ctx->pCont.buf + ctx->pCont.len; + // buf->len = sz; + //} +} +// change later +static bool handleUserData(SContent* data) { + SRpcHead rpcHead; + + bool finish = false; + int32_t msgLen, leftLen, retLen; + int32_t headLen = sizeof(rpcHead); + if (data->len >= headLen) { + memcpy((char*)&rpcHead, data->buf, headLen); + msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen + headLen <= data->len) { + return true; + } else { + return false; + } + } else { + return false; + } } -void onTimeout(uv_timer_t* handle) { +void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - tDebug("time out"); + SConnCtx* ctx = cli->data; + SContent* pCont = &ctx->pCont; + if (nread > 0) { + pCont->len += nread; + bool finish = handleUserData(pCont); + if (finish == false) { + tDebug("continue read"); + } else { + tDebug("read completely"); + } + return; + } + + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } + uv_close((uv_handle_t*)cli, uvConnCtxDestroy); } -void onRead(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { +void allocConnBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(sizeof(char)); + buf->len = 2; +} + +void onTimeout(uv_timer_t* handle) { // opt - tDebug("data already was read on a stream"); + tDebug("time out"); } void onWrite(uv_write_t* req, int status) { + SConnCtx* ctx = req->data; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + connCtxDestroy(ctx); + } // opt - if (req) tDebug("data already was written on stream"); } void workerAsyncCB(uv_async_t* handle) { - // opt SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); + SConnCtx* conn = NULL; + + // opt later + pthread_mutex_lock(&pObj->connMtx); + if (!QUEUE_IS_EMPTY(&pObj->conn)) { + queue* head = QUEUE_HEAD(&pObj->conn); + conn = QUEUE_DATA(head, SConnCtx, queue); + QUEUE_REMOVE(&conn->queue); + } + pthread_mutex_unlock(&pObj->connMtx); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } } + void onAccept(uv_stream_t* stream, int status) { if (status == -1) { return; } SServerObj* pObj = container_of(stream, SServerObj, server); - tDebug("new conntion accepted by main server, dispatch to one worker thread"); uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pObj->loop, cli); + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); - uv_buf_t buf = uv_buf_init("a", 1); - // despatch to worker thread + uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, onWrite); } else { uv_close((uv_handle_t*)cli, NULL); } } void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + tDebug("connection coming"); if (nread < 0) { if (nread != UV_EOF) { tError("read error %s", uv_err_name(nread)); @@ -223,6 +360,11 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_close((uv_handle_t*)q, NULL); return; } + // free memory allocated by + assert(nread == strlen(notify)); + assert(buf->base[0] == notify[0]); + free(buf->base); + SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe); uv_pipe_t* pipe = (uv_pipe_t*)q; @@ -230,46 +372,90 @@ void onConnection(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tError("No pending count"); return; } + uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SConnCtx* pConn = malloc(sizeof(SConnCtx)); + SConnCtx* pConn = connCtxCreate(); /* init conn timer*/ pConn->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pObj->loop, pConn->pTimer); - pConn->pClient = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); pConn->pWorkerAsync = pObj->workerAsync; // thread safty - uv_tcp_init(pObj->loop, pConn->pClient); - if (uv_accept(q, (uv_stream_t*)(pConn->pClient)) == 0) { + // init client handle + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, pConn->pTcp); + pConn->pTcp->data = pConn; + + // init write request, just + pConn->pWriter = calloc(1, sizeof(uv_write_t)); + pConn->pWriter->data = pConn; + + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; - uv_fileno((const uv_handle_t*)pConn->pClient, &fd); + uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); tDebug("new connection created: %d", fd); - uv_timer_start(pConn->pTimer, onTimeout, 10, 0); - uv_read_start((uv_stream_t*)(pConn->pClient), allocBuffer, onRead); + uv_read_start((uv_stream_t*)(pConn->pTcp), allocReadBuffer, onRead); } else { - uv_timer_stop(pConn->pTimer); - free(pConn->pTimer); - uv_close((uv_handle_t*)pConn->pClient, NULL); - free(pConn->pClient); - free(pConn); + connCtxDestroy(pConn); } } +void* acceptThread(void* arg) { + // opt + SServerObj* srv = (SServerObj*)arg; + uv_tcp_init(srv->loop, &srv->server); + + struct sockaddr_in bind_addr; + + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, onAccept)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); +} void* workerThread(void* arg) { SThreadObj* pObj = (SThreadObj*)arg; - int fd = pObj->fd; + pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pObj->loop); uv_pipe_init(pObj->loop, pObj->pipe, 1); - uv_pipe_open(pObj->pipe, fd); + uv_pipe_open(pObj->pipe, pObj->fd); + + QUEUE_INIT(&pObj->conn); pObj->workerAsync = malloc(sizeof(uv_async_t)); uv_async_init(pObj->loop, pObj->workerAsync, workerAsyncCB); - uv_read_start((uv_stream_t*)pObj->pipe, allocBuffer, onConnection); + + uv_read_start((uv_stream_t*)pObj->pipe, allocConnBuffer, onConnection); + uv_run(pObj->loop, UV_RUN_DEFAULT); } +static SConnCtx* connCtxCreate() { + SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx)); + return pConn; +} +static void connCtxDestroy(SConnCtx* ctx) { + if (ctx == NULL) { + return; + } + uv_timer_stop(ctx->pTimer); + free(ctx->pTimer); + uv_close((uv_handle_t*)ctx->pTcp, NULL); + free(ctx->pTcp); + free(ctx->pWriter); + free(ctx); + // handle +} +static void uvConnCtxDestroy(uv_handle_t* handle) { + SConnCtx* ctx = handle->data; + connCtxDestroy(ctx); +} + #else #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) @@ -442,7 +628,8 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) return NULL; - if (pInit->label) tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + if (pInit->label) tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + pRpc->connType = pInit->connType; if (pRpc->connType == TAOS_CONN_CLIENT) { pRpc->numOfThreads = pInit->numOfThreads; diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..9e58bf08cdeb27ed6f6c6a187fc156bd586dd2a4 --- /dev/null +++ b/source/libs/transport/test/CMakeLists.txt @@ -0,0 +1,21 @@ +add_executable(transportTest "") +target_sources(transportTest + PRIVATE + "transportTests.cc" +) + +target_include_directories(transportTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/transport" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries (transportTest + os + util + common + gtest_main + transport +) + + diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc new file mode 100644 index 0000000000000000000000000000000000000000..468aeba8a9adad92d730bc251b3416eddea7e60f --- /dev/null +++ b/source/libs/transport/test/transportTests.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include + +#include "transportInt.h" +#include "trpc.h" + +using namespace std; + +int main() { + SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5}; + void* p = rpcOpen(&init); + + while (1) { + std::cout << "cron task" << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); + } +} diff --git a/source/libs/transport/test/transportTests.cpp b/source/libs/transport/test/transportTests.cpp deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000