未验证 提交 c3755720 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Merge pull request #5028 from freemine/mac

bugfix on mac and pressure-test
...@@ -138,14 +138,6 @@ void mnodeDecClusterRef(SClusterObj *pCluster) { ...@@ -138,14 +138,6 @@ void mnodeDecClusterRef(SClusterObj *pCluster) {
sdbDecRef(tsClusterSdb, pCluster); sdbDecRef(tsClusterSdb, pCluster);
} }
#ifdef __APPLE__
bool taosGetSystemUid(char *uid) {
fprintf(stderr, "%s[%d]%s(): not implemented yet!\n", basename(__FILE__), __LINE__, __func__);
abort();
return false;
}
#endif // __APPLE__
static int32_t mnodeCreateCluster() { static int32_t mnodeCreateCluster() {
int64_t numOfClusters = sdbGetNumOfRows(tsClusterSdb); int64_t numOfClusters = sdbGetNumOfRows(tsClusterSdb);
if (numOfClusters != 0) return TSDB_CODE_SUCCESS; if (numOfClusters != 0) return TSDB_CODE_SUCCESS;
......
...@@ -134,7 +134,7 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { ...@@ -134,7 +134,7 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, 0); kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
if (ret != KERN_SUCCESS) { if (ret != KERN_SUCCESS) {
fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem);
// we fail-fast here, because we have less-doc about semaphore_create for the moment // we fail-fast here, because we have less-doc about semaphore_create for the moment
......
...@@ -103,4 +103,15 @@ int taosSystem(const char *cmd) { ...@@ -103,4 +103,15 @@ int taosSystem(const char *cmd) {
void taosSetCoreDump() {} void taosSetCoreDump() {}
char *taosGetCmdlineByPID(int pid) { return ""; } char *taosGetCmdlineByPID(int pid) {
\ No newline at end of file return "[not supported yet]";
}
bool taosGetSystemUid(char *uid) {
uuid_t uuid = {0};
uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse(uuid, uid);
return true;
}
...@@ -37,7 +37,7 @@ void httpTimeToString(time_t t, char *buf, int32_t buflen) { ...@@ -37,7 +37,7 @@ void httpTimeToString(time_t t, char *buf, int32_t buflen) {
time_t tt = t / 1000; time_t tt = t / 1000;
ptm = localtime(&tt); ptm = localtime(&tt);
strftime(ts, 31, "%Y-%m-%d %H:%M:%S", ptm); strftime(ts, 31, "%Y-%m-%d %H:%M:%S", ptm);
sprintf(buf, "%s.%03" PRId64, ts, t % 1000); sprintf(buf, "%s.%03" PRId64, ts, (int64_t)(t % 1000));
} }
int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...) { int32_t httpAddToSqlCmdBuffer(HttpContext *pContext, const char *const format, ...) {
......
...@@ -233,11 +233,7 @@ typedef struct { ...@@ -233,11 +233,7 @@ typedef struct {
SMemTable* mem; SMemTable* mem;
SMemTable* imem; SMemTable* imem;
STsdbFileH* tsdbFileH; STsdbFileH* tsdbFileH;
#ifdef __APPLE__ tsem_t readyToCommit;
sem_t *readyToCommit;
#else // __APPLE__
sem_t readyToCommit;
#endif // __APPLE__
pthread_mutex_t mutex; pthread_mutex_t mutex;
bool repoLocked; bool repoLocked;
int32_t code; // Commit code int32_t code; // Commit code
......
...@@ -166,11 +166,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { ...@@ -166,11 +166,7 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
pRepo->imem = NULL; pRepo->imem = NULL;
tsdbUnlockRepo(pRepo); tsdbUnlockRepo(pRepo);
tsdbUnRefMemTable(pRepo, pIMem); tsdbUnRefMemTable(pRepo, pIMem);
#ifdef __APPLE__ tsem_post(&(pRepo->readyToCommit));
sem_post(pRepo->readyToCommit);
#else // __APPLE__
sem_post(&(pRepo->readyToCommit));
#endif // __APPLE__
} }
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
......
...@@ -146,11 +146,7 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { ...@@ -146,11 +146,7 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (toCommit) { if (toCommit) {
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
#ifdef __APPLE__ tsem_wait(&(pRepo->readyToCommit));
sem_wait(pRepo->readyToCommit);
#else // __APPLE__
sem_wait(&(pRepo->readyToCommit));
#endif // __APPLE__
terrno = pRepo->code; terrno = pRepo->code;
} }
tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->mem);
...@@ -647,21 +643,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { ...@@ -647,21 +643,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
goto _err; goto _err;
} }
#ifdef __APPLE__ code = tsem_init(&(pRepo->readyToCommit), 0, 1);
pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1);
if (pRepo->readyToCommit==SEM_FAILED) {
code = errno;
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
#else // __APPLE__
code = sem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
code = errno; code = errno;
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
goto _err; goto _err;
} }
#endif // __APPLE__
pRepo->repoLocked = false; pRepo->repoLocked = false;
...@@ -707,11 +694,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { ...@@ -707,11 +694,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
// tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem); // tsdbFreeMemTable(pRepo->imem);
tfree(pRepo->rootDir); tfree(pRepo->rootDir);
#ifdef __APPLE__ tsem_destroy(&(pRepo->readyToCommit));
sem_close(pRepo->readyToCommit);
#else // __APPLE__
sem_destroy(&(pRepo->readyToCommit));
#endif // __APPLE__
pthread_mutex_destroy(&pRepo->mutex); pthread_mutex_destroy(&pRepo->mutex);
free(pRepo); free(pRepo);
} }
......
...@@ -207,11 +207,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { ...@@ -207,11 +207,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) { int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (pRepo->mem == NULL) return 0; if (pRepo->mem == NULL) return 0;
#ifdef __APPLE__ tsem_wait(&(pRepo->readyToCommit));
sem_wait(pRepo->readyToCommit);
#else // __APPLE__
sem_wait(&(pRepo->readyToCommit));
#endif // __APPLE__
ASSERT(pRepo->imem == NULL); ASSERT(pRepo->imem == NULL);
...@@ -233,13 +229,8 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) { ...@@ -233,13 +229,8 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbAsyncCommit(pRepo); tsdbAsyncCommit(pRepo);
#ifdef __APPLE__ tsem_wait(&(pRepo->readyToCommit));
sem_wait(pRepo->readyToCommit); tsem_post(&(pRepo->readyToCommit));
sem_post(pRepo->readyToCommit);
#else // __APPLE__
sem_wait(&(pRepo->readyToCommit));
sem_post(&(pRepo->readyToCommit));
#endif // __APPLE__
if (pRepo->code != TSDB_CODE_SUCCESS) { if (pRepo->code != TSDB_CODE_SUCCESS) {
terrno = pRepo->code; terrno = pRepo->code;
......
...@@ -13,6 +13,12 @@ ...@@ -13,6 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// how to use to do a pressure-test upon eok
// tester: cat /dev/urandom | nc -c <ip> <port>
// testee: ./debug/build/bin/epoll -l <port> > /dev/null
// compare against: nc -l <port> > /dev/null
// monitor and compare : glances
#ifdef __APPLE__ #ifdef __APPLE__
#include "eok.h" #include "eok.h"
#else // __APPLE__ #else // __APPLE__
...@@ -68,55 +74,62 @@ static int ep_dummy = 0; ...@@ -68,55 +74,62 @@ static int ep_dummy = 0;
static ep_t* ep_create(void); static ep_t* ep_create(void);
static void ep_destroy(ep_t *ep); static void ep_destroy(ep_t *ep);
static void* routine(void* arg); static void* routine(void* arg);
static int open_connect(unsigned short port);
static int open_listen(unsigned short port); static int open_listen(unsigned short port);
typedef struct client_s client_t; typedef struct fde_s fde_t;
struct client_s { struct fde_s {
int skt; int skt;
void (*on_event)(ep_t *ep, struct epoll_event *events, client_t *client); void (*on_event)(ep_t *ep, struct epoll_event *events, fde_t *client);
volatile unsigned int state; // 1: listenning; 2: connected
}; };
static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client); static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client);
static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client);
#define usage(arg0, fmt, ...) do { \
if (fmt[0]) { \
fprintf(stderr, "" fmt "\n", ##__VA_ARGS__); \
} \
fprintf(stderr, "usage:\n"); \
fprintf(stderr, " %s -l <port> : specify listenning port\n", arg0); \
} while (0)
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
char *prg = basename(argv[0]);
if (argc==1) {
usage(prg, "");
return 0;
}
ep_t* ep = ep_create(); ep_t* ep = ep_create();
A(ep, "failed"); A(ep, "failed");
int skt = open_connect(6789); for (int i=1; i<argc; ++i) {
if (skt!=-1) { const char *arg = argv[i];
client_t *client = (client_t*)calloc(1, sizeof(*client)); if (0==strcmp(arg, "-l")) {
if (client) { ++i;
client->skt = skt; if (i>=argc) {
client->on_event = echo_event; usage(prg, "expecting <port> after -l, but got nothing");
client->state = 2; return 1; // confirmed potential leakage
struct epoll_event ev = {0}; }
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; arg = argv[i];
ev.data.ptr = client; int port = atoi(arg);
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); int skt = open_listen(port);
} if (skt==-1) continue;
} fde_t *client = (fde_t*)calloc(1, sizeof(*client));
skt = open_listen(0); if (!client) {
if (skt!=-1) { E("out of memory");
client_t *client = (client_t*)calloc(1, sizeof(*client)); close(skt);
if (client) { continue;
}
client->skt = skt; client->skt = skt;
client->on_event = echo_event; client->on_event = listen_event;
client->state = 1;
struct epoll_event ev = {0}; struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = client; ev.data.ptr = client;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
continue;
} }
usage(prg, "unknown argument: [%s]", arg);
return 1;
} }
// char c = '\0';
// while ((c=getchar())!=EOF) {
// switch (c) {
// case 'q': break;
// default: continue;
// }
// }
// getchar();
char *line = NULL; char *line = NULL;
size_t linecap = 0; size_t linecap = 0;
ssize_t linelen; ssize_t linelen;
...@@ -205,7 +218,7 @@ static void* routine(void* arg) { ...@@ -205,7 +218,7 @@ static void* routine(void* arg) {
continue; continue;
} }
A(ev->data.ptr, "internal logic error"); A(ev->data.ptr, "internal logic error");
client_t *client = (client_t*)ev->data.ptr; fde_t *client = (fde_t*)ev->data.ptr;
client->on_event(ep, ev, client); client->on_event(ep, ev, client);
continue; continue;
} }
...@@ -223,7 +236,7 @@ static int open_listen(unsigned short port) { ...@@ -223,7 +236,7 @@ static int open_listen(unsigned short port) {
do { do {
struct sockaddr_in si = {0}; struct sockaddr_in si = {0};
si.sin_family = AF_INET; si.sin_family = AF_INET;
si.sin_addr.s_addr = inet_addr("127.0.0.1"); si.sin_addr.s_addr = inet_addr("0.0.0.0");
si.sin_port = htons(port); si.sin_port = htons(port);
r = bind(skt, (struct sockaddr*)&si, sizeof(si)); r = bind(skt, (struct sockaddr*)&si, sizeof(si));
if (r) { if (r) {
...@@ -249,63 +262,31 @@ static int open_listen(unsigned short port) { ...@@ -249,63 +262,31 @@ static int open_listen(unsigned short port) {
return -1; return -1;
} }
static int open_connect(unsigned short port) { static void listen_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
int r = 0; A(ev->events & EPOLLIN, "internal logic error");
int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); struct sockaddr_in si = {0};
if (skt==-1) { socklen_t silen = sizeof(si);
E("socket() failed"); int skt = accept(client->skt, (struct sockaddr*)&si, &silen);
return -1; A(skt!=-1, "internal logic error");
fde_t *server = (fde_t*)calloc(1, sizeof(*server));
if (!server) {
close(skt);
return;
} }
do { server->skt = skt;
struct sockaddr_in si = {0}; server->on_event = null_event;
si.sin_family = AF_INET; struct epoll_event ee = {0};
si.sin_addr.s_addr = inet_addr("127.0.0.1"); ee.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
si.sin_port = htons(port); ee.data.ptr = server;
r = connect(skt, (struct sockaddr*)&si, sizeof(si)); A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ee), "");
if (r) {
E("connect(%u) failed", port);
break;
}
memset(&si, 0, sizeof(si));
socklen_t len = sizeof(si);
r = getsockname(skt, (struct sockaddr *)&si, &len);
if (r) {
E("getsockname() failed");
}
A(len==sizeof(si), "internal logic error");
D("connected: %d", ntohs(si.sin_port));
return skt;
} while (0);
close(skt);
return -1;
} }
static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client) { static void null_event(ep_t *ep, struct epoll_event *ev, fde_t *client) {
if (ev->events & EPOLLIN) { if (ev->events & EPOLLIN) {
if (client->state==1) { char buf[8192];
struct sockaddr_in si = {0}; int n = recv(client->skt, buf, sizeof(buf), 0);
socklen_t silen = sizeof(si); A(n>=0 && n<=sizeof(buf), "internal logic error:[%d]", n);
int skt = accept(client->skt, (struct sockaddr*)&si, &silen); A(n==fwrite(buf, 1, n, stdout), "internal logic error");
if (skt!=-1) {
client_t *server = (client_t*)calloc(1, sizeof(*server));
if (server) {
server->skt = skt;
server->on_event = echo_event;
server->state = 2;
struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = server;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
}
}
}
if (client->state==2) {
char buf[4];
int n = recv(client->skt, buf, sizeof(buf)-1, 0);
A(n>=0 && n<sizeof(buf), "internal logic error:[%d]", n);
buf[n] = '\0';
fprintf(stderr, "events[%x]:%s\n", ev->events, buf);
}
} }
if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
A(0==pthread_mutex_lock(&ep->lock), ""); A(0==pthread_mutex_lock(&ep->lock), "");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册