diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 94ec59b1b085208349d17dfa6ed203b36ebbf61b..169d2ebd9dae1bcc34ea24dd3770951f9cbb442b 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -138,14 +138,6 @@ void mnodeDecClusterRef(SClusterObj *pCluster) { sdbDecRef(tsClusterSdb, pCluster); } -#ifdef __APPLE__ -bool taosGetSystemUid(char *uid) { - fprintf(stderr, "%s[%d]%s(): not implemented yet!\n", basename(__FILE__), __LINE__, __func__); - abort(); - return false; -} -#endif // __APPLE__ - static int32_t mnodeCreateCluster() { int64_t numOfClusters = sdbGetNumOfRows(tsClusterSdb); if (numOfClusters != 0) return TSDB_CODE_SUCCESS; diff --git a/src/os/src/darwin/darwinSemphone.c b/src/os/src/darwin/darwinSemphone.c index a9e470bd7019c84d8c8c8368da242cbc4a1d618b..6df44a52b36505c7d1407b119e2da5c2494e2437 100644 --- a/src/os/src/darwin/darwinSemphone.c +++ b/src/os/src/darwin/darwinSemphone.c @@ -134,7 +134,7 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { errno = ENOMEM; return -1; } - kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, 0); + kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); if (ret != KERN_SUCCESS) { fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem); // we fail-fast here, because we have less-doc about semaphore_create for the moment diff --git a/src/os/src/darwin/darwinSysInfo.c b/src/os/src/darwin/darwinSysInfo.c index ad3facdcee50775aeea0794b60c0d9f82d1de538..cf799d7f55a0b6c084bb563e4cb00ec256bbc8e3 100644 --- a/src/os/src/darwin/darwinSysInfo.c +++ b/src/os/src/darwin/darwinSysInfo.c @@ -103,4 +103,15 @@ int taosSystem(const char *cmd) { void taosSetCoreDump() {} -char *taosGetCmdlineByPID(int pid) { return ""; } \ No newline at end of file +char *taosGetCmdlineByPID(int pid) { + return "[not supported yet]"; +} + +bool taosGetSystemUid(char *uid) { + uuid_t uuid = {0}; + uuid_generate(uuid); + // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null + uuid_unparse(uuid, uid); + return true; +} + diff --git a/src/plugins/http/src/httpUtil.c b/src/plugins/http/src/httpUtil.c index 7f1e2a94d1918151277f46670f54681d64e8b56f..4fdd755b40d0f76d1c36cdd42a795d2484f7b4b5 100644 --- a/src/plugins/http/src/httpUtil.c +++ b/src/plugins/http/src/httpUtil.c @@ -37,7 +37,7 @@ void httpTimeToString(time_t t, char *buf, int32_t buflen) { time_t tt = t / 1000; ptm = localtime(&tt); strftime(ts, 31, "%Y-%m-%d %H:%M:%S", ptm); - sprintf(buf, "%s.%03" PRId64, ts, t % 1000); + sprintf(buf, "%s.%03" PRId64, ts, (int64_t)(t % 1000)); } int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...) { diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index f3e69d92103b691b934bf0dee8ccbde324da89fa..05335b45d5850033918c1af503f3d505af2813c0 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -233,11 +233,7 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; -#ifdef __APPLE__ - sem_t *readyToCommit; -#else // __APPLE__ - sem_t readyToCommit; -#endif // __APPLE__ + tsem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; int32_t code; // Commit code diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index b3f7f51327915831830a3e3e14940eab7c9b6478..cd8358e5e39c0b876841423d7881c3ee5ca7913b 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -166,11 +166,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { pRepo->imem = NULL; tsdbUnlockRepo(pRepo); tsdbUnRefMemTable(pRepo, pIMem); -#ifdef __APPLE__ - sem_post(pRepo->readyToCommit); -#else // __APPLE__ - sem_post(&(pRepo->readyToCommit)); -#endif // __APPLE__ + tsem_post(&(pRepo->readyToCommit)); } static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index adab932bb607e7b8eaa79c816dcab6e89e9d9392..cadbfa91cf51abac00bcb0d77605ac96346ec46c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -146,11 +146,7 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (toCommit) { tsdbAsyncCommit(pRepo); -#ifdef __APPLE__ - sem_wait(pRepo->readyToCommit); -#else // __APPLE__ - sem_wait(&(pRepo->readyToCommit)); -#endif // __APPLE__ + tsem_wait(&(pRepo->readyToCommit)); terrno = pRepo->code; } tsdbUnRefMemTable(pRepo, pRepo->mem); @@ -647,21 +643,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } -#ifdef __APPLE__ - pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1); - if (pRepo->readyToCommit==SEM_FAILED) { - code = errno; - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } -#else // __APPLE__ - code = sem_init(&(pRepo->readyToCommit), 0, 1); + code = tsem_init(&(pRepo->readyToCommit), 0, 1); if (code != 0) { code = errno; terrno = TAOS_SYSTEM_ERROR(code); goto _err; } -#endif // __APPLE__ pRepo->repoLocked = false; @@ -707,11 +694,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); tfree(pRepo->rootDir); -#ifdef __APPLE__ - sem_close(pRepo->readyToCommit); -#else // __APPLE__ - sem_destroy(&(pRepo->readyToCommit)); -#endif // __APPLE__ + tsem_destroy(&(pRepo->readyToCommit)); pthread_mutex_destroy(&pRepo->mutex); free(pRepo); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 206a3332d57e4db3f4ce6fae4421298fd0736865..42bbebe5f7ae319e45cfd37bea6c1dd673be7904 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -207,11 +207,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { if (pRepo->mem == NULL) return 0; -#ifdef __APPLE__ - sem_wait(pRepo->readyToCommit); -#else // __APPLE__ - sem_wait(&(pRepo->readyToCommit)); -#endif // __APPLE__ + tsem_wait(&(pRepo->readyToCommit)); ASSERT(pRepo->imem == NULL); @@ -233,13 +229,8 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) { STsdbRepo *pRepo = (STsdbRepo *)repo; tsdbAsyncCommit(pRepo); -#ifdef __APPLE__ - sem_wait(pRepo->readyToCommit); - sem_post(pRepo->readyToCommit); -#else // __APPLE__ - sem_wait(&(pRepo->readyToCommit)); - sem_post(&(pRepo->readyToCommit)); -#endif // __APPLE__ + tsem_wait(&(pRepo->readyToCommit)); + tsem_post(&(pRepo->readyToCommit)); if (pRepo->code != TSDB_CODE_SUCCESS) { terrno = pRepo->code; diff --git a/tests/examples/c/epoll.c b/tests/examples/c/epoll.c index 4f65ea478e21dbf7d5fe067bfc5b142c410ad666..20ab3951586d353909090cb1344fad10d0362ddf 100644 --- a/tests/examples/c/epoll.c +++ b/tests/examples/c/epoll.c @@ -13,6 +13,12 @@ * along with this program. If not, see . */ +// how to use to do a pressure-test upon eok +// tester: cat /dev/urandom | nc -c +// testee: ./debug/build/bin/epoll -l > /dev/null +// compare against: nc -l > /dev/null +// monitor and compare : glances + #ifdef __APPLE__ #include "eok.h" #else // __APPLE__ @@ -68,55 +74,62 @@ static int ep_dummy = 0; static ep_t* ep_create(void); static void ep_destroy(ep_t *ep); static void* routine(void* arg); -static int open_connect(unsigned short port); static int open_listen(unsigned short port); -typedef struct client_s client_t; -struct client_s { +typedef struct fde_s fde_t; +struct fde_s { int skt; - void (*on_event)(ep_t *ep, struct epoll_event *events, client_t *client); - volatile unsigned int state; // 1: listenning; 2: connected + void (*on_event)(ep_t *ep, struct epoll_event *events, fde_t *client); }; -static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client); +static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client); +static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client); + +#define usage(arg0, fmt, ...) do { \ + if (fmt[0]) { \ + fprintf(stderr, "" fmt "\n", ##__VA_ARGS__); \ + } \ + fprintf(stderr, "usage:\n"); \ + fprintf(stderr, " %s -l : specify listenning port\n", arg0); \ +} while (0) int main(int argc, char *argv[]) { + char *prg = basename(argv[0]); + if (argc==1) { + usage(prg, ""); + return 0; + } ep_t* ep = ep_create(); A(ep, "failed"); - int skt = open_connect(6789); - if (skt!=-1) { - client_t *client = (client_t*)calloc(1, sizeof(*client)); - if (client) { - client->skt = skt; - client->on_event = echo_event; - client->state = 2; - struct epoll_event ev = {0}; - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; - ev.data.ptr = client; - A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); - } - } - skt = open_listen(0); - if (skt!=-1) { - client_t *client = (client_t*)calloc(1, sizeof(*client)); - if (client) { + for (int i=1; i=argc) { + usage(prg, "expecting after -l, but got nothing"); + return 1; // confirmed potential leakage + } + arg = argv[i]; + int port = atoi(arg); + int skt = open_listen(port); + if (skt==-1) continue; + fde_t *client = (fde_t*)calloc(1, sizeof(*client)); + if (!client) { + E("out of memory"); + close(skt); + continue; + } client->skt = skt; - client->on_event = echo_event; - client->state = 1; + client->on_event = listen_event; struct epoll_event ev = {0}; ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; ev.data.ptr = client; A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + continue; } + usage(prg, "unknown argument: [%s]", arg); + return 1; } - // char c = '\0'; - // while ((c=getchar())!=EOF) { - // switch (c) { - // case 'q': break; - // default: continue; - // } - // } - // getchar(); char *line = NULL; size_t linecap = 0; ssize_t linelen; @@ -205,7 +218,7 @@ static void* routine(void* arg) { continue; } A(ev->data.ptr, "internal logic error"); - client_t *client = (client_t*)ev->data.ptr; + fde_t *client = (fde_t*)ev->data.ptr; client->on_event(ep, ev, client); continue; } @@ -223,7 +236,7 @@ static int open_listen(unsigned short port) { do { struct sockaddr_in si = {0}; si.sin_family = AF_INET; - si.sin_addr.s_addr = inet_addr("127.0.0.1"); + si.sin_addr.s_addr = inet_addr("0.0.0.0"); si.sin_port = htons(port); r = bind(skt, (struct sockaddr*)&si, sizeof(si)); if (r) { @@ -249,63 +262,31 @@ static int open_listen(unsigned short port) { return -1; } -static int open_connect(unsigned short port) { - int r = 0; - int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (skt==-1) { - E("socket() failed"); - return -1; +static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) { + A(ev->events & EPOLLIN, "internal logic error"); + struct sockaddr_in si = {0}; + socklen_t silen = sizeof(si); + int skt = accept(client->skt, (struct sockaddr*)&si, &silen); + A(skt!=-1, "internal logic error"); + fde_t *server = (fde_t*)calloc(1, sizeof(*server)); + if (!server) { + close(skt); + return; } - do { - struct sockaddr_in si = {0}; - si.sin_family = AF_INET; - si.sin_addr.s_addr = inet_addr("127.0.0.1"); - si.sin_port = htons(port); - r = connect(skt, (struct sockaddr*)&si, sizeof(si)); - if (r) { - E("connect(%u) failed", port); - break; - } - memset(&si, 0, sizeof(si)); - socklen_t len = sizeof(si); - r = getsockname(skt, (struct sockaddr *)&si, &len); - if (r) { - E("getsockname() failed"); - } - A(len==sizeof(si), "internal logic error"); - D("connected: %d", ntohs(si.sin_port)); - return skt; - } while (0); - close(skt); - return -1; + server->skt = skt; + server->on_event = null_event; + struct epoll_event ee = {0}; + ee.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ee.data.ptr = server; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ee), ""); } -static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client) { +static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client) { if (ev->events & EPOLLIN) { - if (client->state==1) { - struct sockaddr_in si = {0}; - socklen_t silen = sizeof(si); - int skt = accept(client->skt, (struct sockaddr*)&si, &silen); - if (skt!=-1) { - client_t *server = (client_t*)calloc(1, sizeof(*server)); - if (server) { - server->skt = skt; - server->on_event = echo_event; - server->state = 2; - struct epoll_event ev = {0}; - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; - ev.data.ptr = server; - A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); - } - } - } - if (client->state==2) { - char buf[4]; - int n = recv(client->skt, buf, sizeof(buf)-1, 0); - A(n>=0 && nevents, buf); - } + char buf[8192]; + int n = recv(client->skt, buf, sizeof(buf), 0); + A(n>=0 && n<=sizeof(buf), "internal logic error:[%d]", n); + A(n==fwrite(buf, 1, n, stdout), "internal logic error"); } if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { A(0==pthread_mutex_lock(&ep->lock), "");