From 3c82bdfa51709a355407030c52895ec36cf39e53 Mon Sep 17 00:00:00 2001 From: freemine Date: Tue, 2 Feb 2021 23:50:13 +0800 Subject: [PATCH] first taosd on macosx --- src/dnode/src/dnodeShell.c | 24 ++--- src/dnode/src/dnodeStep.c | 4 +- src/dnode/src/dnodeSystem.c | 2 +- src/dnode/src/dnodeVWrite.c | 2 +- src/os/inc/osDarwin.h | 3 +- src/os/src/darwin/darwinEnv.c | 10 +- src/os/src/darwin/darwinFile.c | 94 +++++++----------- src/os/src/darwin/darwinSemphone.c | 40 ++++++++ src/os/src/darwin/darwinSysInfo.c | 150 +++++++++++++++++++++++++---- src/os/src/detail/osSemphone.c | 2 +- src/plugins/http/src/httpSystem.c | 5 + src/rpc/src/rpcTcp.c | 87 +++++++++-------- src/sync/src/syncTcp.c | 5 + src/util/src/tsocket.c | 15 ++- tests/examples/c/epoll.c | 5 +- 15 files changed, 304 insertions(+), 144 deletions(-) diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index a40df626e2..9a226b81e8 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -36,12 +36,12 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; - + // the following message shall be treated as mnode write dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue; @@ -57,13 +57,13 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; - - // the following message shall be treated as mnode query + + // the following message shall be treated as mnode query dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; @@ -153,7 +153,7 @@ static int32_t dnodeAuthNettestUser(char *user, char *spi, char *encrypt, char * } static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0; + if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0; int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); if (code != TSDB_CODE_APP_NOT_READY) return code; @@ -164,7 +164,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char rpcMsg.pCont = pMsg; rpcMsg.contLen = sizeof(SAuthMsg); rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH; - + dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); @@ -202,14 +202,14 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { SRpcMsg rpcRsp = {0}; dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); terrno = rpcRsp.code; - + if (rpcRsp.code != 0) { rpcFreeCont(rpcRsp.pCont); dError("vgId:%d, tid:%d failed to config table from mnode", vgId, tid); return NULL; } else { dInfo("vgId:%d, tid:%d config table msg is received", vgId, tid); - + // delete this after debug finished SMDCreateTableMsg *pTable = rpcRsp.pCont; int16_t numOfColumns = htons(pTable->numOfColumns); @@ -231,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() { } return info; -} \ No newline at end of file +} diff --git a/src/dnode/src/dnodeStep.c b/src/dnode/src/dnodeStep.c index 8878299d94..eda6fb227d 100644 --- a/src/dnode/src/dnodeStep.c +++ b/src/dnode/src/dnodeStep.c @@ -69,6 +69,6 @@ int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) { return 0; } -void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) { +void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) { taosStepCleanupImp(pSteps, stepSize - 1); -} \ No newline at end of file +} diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 33ebc8b64f..e49b3eba99 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -181,4 +181,4 @@ static void sigintHandler(int32_t signum, void *sigInfo, void *context) { #ifdef WINDOWS tsem_wait(&exitSem); #endif -} \ No newline at end of file +} diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 93d1611ebc..a3ff459396 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -53,7 +53,7 @@ void dnodeCleanupVWrite() { for (int32_t i = 0; i < tsVWriteWP.max; ++i) { SVWriteWorker *pWorker = tsVWriteWP.worker + i; if (taosCheckPthreadValid(pWorker->thread)) { - taosQsetThreadResume(pWorker->qset); + if (pWorker->qset) taosQsetThreadResume(pWorker->qset); } } diff --git a/src/os/inc/osDarwin.h b/src/os/inc/osDarwin.h index 2a05d5682e..7c206afe7a 100644 --- a/src/os/inc/osDarwin.h +++ b/src/os/inc/osDarwin.h @@ -85,6 +85,7 @@ extern "C" { #define TAOS_OS_FUNC_STRING_STR2INT64 #define TAOS_OS_FUNC_SYSINFO #define TAOS_OS_FUNC_TIMER +#define TAOS_OS_FUNC_SEMPHONE_PTHREAD // specific #define htobe64 htonll @@ -114,7 +115,7 @@ int64_t tsosStr2int64(char *str); void taos_block_sigalrm(void); #define TAOS_OS_DEF_EPOLL - #define TAOS_EPOLL_WAIT_TIME 500 + #define TAOS_EPOLL_WAIT_TIME 500 typedef int32_t SOCKET; typedef SOCKET EpollFd; #define EpollClose(pollFd) epoll_close(pollFd) diff --git a/src/os/src/darwin/darwinEnv.c b/src/os/src/darwin/darwinEnv.c index da4b32139e..21736f77fb 100644 --- a/src/os/src/darwin/darwinEnv.c +++ b/src/os/src/darwin/darwinEnv.c @@ -19,15 +19,17 @@ void osInit() { if (configDir[0] == 0) { - strcpy(configDir, "~/TDengine/cfg"); + strcpy(configDir, "/etc/taos"); } strcpy(tsVnodeDir, ""); strcpy(tsDnodeDir, ""); strcpy(tsMnodeDir, ""); - strcpy(tsDataDir, "~/TDengine/data"); - strcpy(tsLogDir, "~/TDengine/log"); - strcpy(tsScriptDir, "~/TDengine/cfg"); + + strcpy(tsDataDir, "/tmp/taosd/data"); + strcpy(tsLogDir, "/tmp/taosd/log"); + strcpy(tsScriptDir, "/etc/taos"); + strcpy(tsOsName, "Darwin"); } diff --git a/src/os/src/darwin/darwinFile.c b/src/os/src/darwin/darwinFile.c index 1e77cd68d8..4236ea1c5f 100644 --- a/src/os/src/darwin/darwinFile.c +++ b/src/os/src/darwin/darwinFile.c @@ -17,71 +17,49 @@ #include "os.h" #include "tulog.h" -#define _SEND_FILE_STEP_ 1000 - int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) { - fseek(in_file, (int32_t)(*offset), 0); - int writeLen = 0; - uint8_t buffer[_SEND_FILE_STEP_] = {0}; - - for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) { - size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file); - if (rlen <= 0) { - return writeLen; - } else if (rlen < _SEND_FILE_STEP_) { - fwrite(buffer, 1, rlen, out_file); - return (int)(writeLen + rlen); - } else { - fwrite(buffer, 1, _SEND_FILE_STEP_, in_file); - writeLen += _SEND_FILE_STEP_; - } + int r = 0; + if (offset) { + r = fseek(in_file, *offset, SEEK_SET); + if (r==-1) return -1; } - - int remain = count - writeLen; - if (remain > 0) { - size_t rlen = fread(buffer, 1, remain, in_file); - if (rlen <= 0) { - return writeLen; - } else { - fwrite(buffer, 1, remain, out_file); - writeLen += remain; + off_t len = count; + while (len>0) { + char buf[1024*16]; + off_t n = sizeof(buf); + if (len 0) { - int32_t rlen = read(sfd, buffer, (int32_t)remain); - if (rlen <= 0) { - return writeLen; - } - else { - taosWriteSocket(sfd, buffer, (int32_t)remain); - writeLen += remain; - } + off_t len = count; + while (len>0) { + char buf[1024*16]; + off_t n = sizeof(buf); + if (len + // #define SEM_USE_PTHREAD // #define SEM_USE_POSIX #define SEM_USE_SEM @@ -279,3 +281,41 @@ int tsem_destroy(tsem_t *sem) { return 0; } +bool taosCheckPthreadValid(pthread_t thread) { + uint64_t id = 0; + int r = pthread_threadid_np(thread, &id); + return r ? false : true; +} + +int64_t taosGetSelfPthreadId() { + return (int64_t)pthread_self(); +} + +int64_t taosGetPthreadId(pthread_t thread) { + return (int64_t)thread; +} + +void taosResetPthread(pthread_t* thread) { + *thread = NULL; +} + +bool taosComparePthread(pthread_t first, pthread_t second) { + return pthread_equal(first, second) ? true : false; +} + +int32_t taosGetPId() { + return (int32_t)getpid(); +} + +int32_t taosGetCurrentAPPName(char* name, int32_t* len) { + char buf[PATH_MAX+1]; + buf[0] = '\0'; + proc_name(getpid(), buf, sizeof(buf)-1); + buf[PATH_MAX] = '\0'; + size_t n = strlen(buf); + if (len) *len = n; + if (name) strcpy(name, buf); + return 0; +} + + diff --git a/src/os/src/darwin/darwinSysInfo.c b/src/os/src/darwin/darwinSysInfo.c index bce60429c5..6af6285f56 100644 --- a/src/os/src/darwin/darwinSysInfo.c +++ b/src/os/src/darwin/darwinSysInfo.c @@ -24,42 +24,134 @@ static void taosGetSystemTimezone() { - // get and set default timezone SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone"); - if (cfg_timezone && cfg_timezone->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) { - char *tz = getenv("TZ"); - if (tz == NULL || strlen(tz) == 0) { - strcpy(tsTimezone, "not configured"); + if (cfg_timezone == NULL) return; + if (cfg_timezone->cfgStatus >= TAOS_CFG_CSTATUS_DEFAULT) { + return; + } + + /* load time zone string from /etc/localtime */ + char buf[4096]; + char *tz = NULL; { + int n = readlink("/etc/localtime", buf, sizeof(buf)); + if (n<0) { + uError("read /etc/localtime error, reason:%s", strerror(errno)); + return; + } + buf[n] = '\0'; + for(int i=n-1; i>=0; --i) { + if (buf[i]=='/') { + if (tz) { + tz = buf + i + 1; + break; + } + tz = buf + i + 1; + } } - else { - strcpy(tsTimezone, tz); + if (!tz || 0==strchr(tz, '/')) { + uError("parsing /etc/localtime failed"); + return; } - cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; - uInfo("timezone not configured, use default"); + + setenv("TZ", tz, 1); + tzset(); } + + /* + * NOTE: do not remove it. + * Enforce set the correct daylight saving time(DST) flag according + * to current time + */ + time_t tx1 = time(NULL); + struct tm tm1; + localtime_r(&tx1, &tm1); + + /* + * format example: + * + * Asia/Shanghai (CST, +0800) + * Europe/London (BST, +0100) + */ + snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %+03ld00)", + tz, tm1.tm_isdst ? tzname[daylight] : tzname[0], -timezone/3600); + + // cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; + uWarn("timezone not configured, set to system default:%s", tsTimezone); } -static void taosGetSystemLocale() { - // get and set default locale +/* + * originally from src/os/src/detail/osSysinfo.c + * POSIX format locale string: + * (Language Strings)_(Country/Region Strings).(code_page) + * + * example: en_US.UTF-8, zh_CN.GB18030, zh_CN.UTF-8, + * + * if user does not specify the locale in taos.cfg the program use default LC_CTYPE as system locale. + * + * In case of some CentOS systems, their default locale is "en_US.utf8", which is not valid code_page + * for libiconv that is employed to convert string in this system. This program will automatically use + * UTF-8 instead as the charset. + * + * In case of windows client, the locale string is not valid POSIX format, user needs to set the + * correct code_page for libiconv. Usually, the code_page of windows system with simple chinese is + * CP936, CP437 for English charset. + * + */ +static void taosGetSystemLocale() { // get and set default locale + char sep = '.'; + char *locale = NULL; + SGlobalCfg *cfg_locale = taosGetConfigOption("locale"); if (cfg_locale && cfg_locale->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) { - char *locale = setlocale(LC_CTYPE, "chs"); - if (locale != NULL) { + locale = setlocale(LC_CTYPE, ""); + if (locale == NULL) { + uError("can't get locale from system, set it to en_US.UTF-8 since error:%d:%s", errno, strerror(errno)); + strcpy(tsLocale, "en_US.UTF-8"); + } else { tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN); - cfg_locale->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; - uInfo("locale not configured, set to default:%s", tsLocale); + uWarn("locale not configured, set to system default:%s", tsLocale); } } + /* if user does not specify the charset, extract it from locale */ SGlobalCfg *cfg_charset = taosGetConfigOption("charset"); if (cfg_charset && cfg_charset->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) { - strcpy(tsCharset, "cp936"); - cfg_charset->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; - uInfo("charset not configured, set to default:%s", tsCharset); + char *str = strrchr(tsLocale, sep); + if (str != NULL) { + str++; + + char *revisedCharset = taosCharsetReplace(str); + tstrncpy(tsCharset, revisedCharset, TSDB_LOCALE_LEN); + + free(revisedCharset); + uWarn("charset not configured, set to system default:%s", tsCharset); + } else { + strcpy(tsCharset, "UTF-8"); + uWarn("can't get locale and charset from system, set it to UTF-8"); + } } } -void taosPrintOsInfo() {} +void taosPrintOsInfo() { + uInfo(" os pageSize: %" PRId64 "(KB)", tsPageSize / 1024); + // uInfo(" os openMax: %" PRId64, tsOpenMax); + // uInfo(" os streamMax: %" PRId64, tsStreamMax); + uInfo(" os numOfCores: %d", tsNumOfCores); + uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB); + uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB); + + struct utsname buf; + if (uname(&buf)) { + uInfo(" can't fetch os info"); + return; + } + uInfo(" os sysname: %s", buf.sysname); + uInfo(" os nodename: %s", buf.nodename); + uInfo(" os release: %s", buf.release); + uInfo(" os version: %s", buf.version); + uInfo(" os machine: %s", buf.machine); + uInfo("=================================="); +} void taosKillSystem() { uError("function taosKillSystem, exit!"); @@ -67,6 +159,22 @@ void taosKillSystem() { } void taosGetSystemInfo() { + // taosGetProcInfos(); + + tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN); + long physical_pages = sysconf(_SC_PHYS_PAGES); + long page_size = sysconf(_SC_PAGESIZE); + tsTotalMemoryMB = physical_pages * page_size / (1024 * 1024); + tsPageSize = page_size; + + // float tmp1, tmp2; + // taosGetSysMemory(&tmp1); + // taosGetProcMemory(&tmp2); + // taosGetDisk(); + // taosGetBandSpeed(&tmp1); + // taosGetCpuUsage(&tmp1, &tmp2); + // taosGetProcIO(&tmp1, &tmp2); + taosGetSystemTimezone(); taosGetSystemLocale(); } @@ -121,7 +229,6 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { char cmdline[1024]; char *taosGetCmdlineByPID(int pid) { - errno = 0; if (proc_pidpath(pid, cmdline, sizeof(cmdline)) <= 0) { @@ -136,6 +243,7 @@ bool taosGetSystemUid(char *uid) { uuid_t uuid = {0}; uuid_generate(uuid); // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null - uuid_unparse(uuid, uid); + uuid_unparse_lower(uuid, uid); return true; } + diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemphone.c index d379e56ed8..1d05ce20a5 100644 --- a/src/os/src/detail/osSemphone.c +++ b/src/os/src/detail/osSemphone.c @@ -62,4 +62,4 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) { return 0; } -#endif \ No newline at end of file +#endif diff --git a/src/plugins/http/src/httpSystem.c b/src/plugins/http/src/httpSystem.c index 4721451529..203db21895 100644 --- a/src/plugins/http/src/httpSystem.c +++ b/src/plugins/http/src/httpSystem.c @@ -92,6 +92,11 @@ void httpStopSystem() { tsHttpServer.stop = 1; #ifdef WINDOWS closesocket(tsHttpServer.fd); +#elif __APPLE__ + if (tsHttpServer.fd!=-1) { + close(tsHttpServer.fd); + tsHttpServer.fd = -1; + } #else shutdown(tsHttpServer.fd, SHUT_RD); #endif diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 3295c130dd..3162ab2e4c 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -17,7 +17,7 @@ #include "tsocket.h" #include "tutil.h" #include "taosdef.h" -#include "taoserror.h" +#include "taoserror.h" #include "rpcLog.h" #include "rpcHead.h" #include "rpcTcp.h" @@ -81,7 +81,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); if (pServerObj == NULL) { tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -95,7 +95,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); if (pServerObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); free(pServerObj); return NULL; } @@ -105,18 +105,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - // initialize parameters in case it may encounter error later + // initialize parameters in case it may encounter error later for (int i = 0; i < numOfThreads; ++i) { pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); if (pThreadObj == NULL) { tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); for (int j=0; jpThreadObj[j]); free(pServerObj->pThreadObj); free(pServerObj); return NULL; } - + pServerObj->pThreadObj[i] = pThreadObj; pThreadObj->pollFd = -1; taosResetPthread(&pThreadObj->thread); @@ -152,9 +152,9 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); - if (pServerObj->fd < 0) code = -1; + if (pServerObj->fd < 0) code = -1; - if (code == 0) { + if (code == 0) { code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj); if (code != 0) { tError("%s failed to create TCP accept thread(%s)", label, strerror(code)); @@ -162,7 +162,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread } if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); taosCleanUpTcpServer(pServerObj); pServerObj = NULL; } else { @@ -175,8 +175,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread static void taosStopTcpThread(SThreadObj* pThreadObj) { if (pThreadObj == NULL) { return;} - // save thread into local variable and signal thread to stop - pthread_t thread = pThreadObj->thread; + // save thread into local variable and signal thread to stop + pthread_t thread = pThreadObj->thread; if (!taosCheckPthreadValid(thread)) { return; } @@ -197,6 +197,11 @@ void taosStopTcpServer(void *handle) { if (pServerObj->fd >= 0) { #ifdef WINDOWS closesocket(pServerObj->fd); +#elif defined(__APPLE__) + if (pServerObj->fd!=-1) { + close(pServerObj->fd); + pServerObj->fd = -1; + } #else shutdown(pServerObj->fd, SHUT_RD); #endif @@ -265,7 +270,7 @@ static void *taosAcceptTcpConnection(void *arg) { taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); continue; } - + // pick up the thread to handle this connection pThreadObj = pServerObj->pThreadObj[threadId]; @@ -274,13 +279,13 @@ static void *taosAcceptTcpConnection(void *arg) { if (pFdObj) { pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->port = htons(caddr.sin_port); - tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, + tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); } else { taosCloseSocket(connFd); tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); - } + } // pick up next thread for next connection threadId++; @@ -295,17 +300,17 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread SClientObj *pClientObj = (SClientObj *)calloc(1, sizeof(SClientObj)); if (pClientObj == NULL) { tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; - } + } tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); pClientObj->numOfThreads = numOfThreads; - pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); + pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); if (pClientObj->pThreadObj == NULL) { tError("TCP:%s no enough memory", label); tfree(pClientObj); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); } int code = 0; @@ -317,7 +322,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread SThreadObj *pThreadObj = (SThreadObj *)calloc(1, sizeof(SThreadObj)); if (pThreadObj == NULL) { tError("TCP:%s no enough memory", label); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); for (int j=0; jpThreadObj[j]); free(pClientObj); pthread_attr_destroy(&thattr); @@ -356,11 +361,11 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread pThreadObj->threadId = i; } if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); taosCleanUpTcpClient(pClientObj); pClientObj = NULL; - } - return pClientObj; + } + return pClientObj; } void taosStopTcpClient(void *chandle) { @@ -374,20 +379,20 @@ void taosStopTcpClient(void *chandle) { void taosCleanUpTcpClient(void *chandle) { SClientObj *pClientObj = chandle; if (pClientObj == NULL) return; - for (int i = 0; i < pClientObj->numOfThreads; ++i) { + for (int i = 0; i < pClientObj->numOfThreads; ++i) { SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; - taosStopTcpThread(pThreadObj); + taosStopTcpThread(pThreadObj); } - + tDebug("%s TCP client is cleaned up", pClientObj->label); tfree(pClientObj->pThreadObj); - tfree(pClientObj); + tfree(pClientObj); } void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { SClientObj * pClientObj = shandle; int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; - atomic_store_32(&pClientObj->index, index + 1); + atomic_store_32(&pClientObj->index, index + 1); SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); @@ -402,12 +407,12 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin } SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); - + if (pFdObj) { pFdObj->thandle = thandle; pFdObj->port = port; pFdObj->ip = ip; - tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", + tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); } else { tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); @@ -422,7 +427,7 @@ void taosCloseTcpConnection(void *chandle) { if (pFdObj == NULL || pFdObj->signature != pFdObj) return; SThreadObj *pThreadObj = pFdObj->pThreadObj; - tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); + tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); // pFdObj->thandle = NULL; pFdObj->closedByApp = 1; @@ -435,7 +440,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand SThreadObj *pThreadObj = pFdObj->pThreadObj; int ret = taosWriteMsg(pFdObj->fd, data, len); - tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); + tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); return ret; } @@ -458,7 +463,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { recvInfo.chandle = NULL; recvInfo.connType = RPC_CONN_TCP; (*(pThreadObj->processData))(&recvInfo); - } + } taosFreeFdObj(pFdObj); } @@ -473,7 +478,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); if (headLen != sizeof(SRpcHead)) { tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen); - return -1; + return -1; } msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); @@ -491,14 +496,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); if (leftLen != retLen) { - tError("%s %p read error, leftLen:%d retLen:%d FD:%p", + tError("%s %p read error, leftLen:%d retLen:%d FD:%p", pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); free(buffer); return -1; } memcpy(msg, &rpcHead, sizeof(SRpcHead)); - + pInfo->msg = msg; pInfo->msgLen = msgLen; pInfo->ip = pFdObj->ip; @@ -509,7 +514,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { pInfo->connType = RPC_CONN_TCP; if (pFdObj->closedByApp) { - free(buffer); + free(buffer); return -1; } @@ -557,7 +562,7 @@ static void *taosProcessTcpData(void *param) { } if (taosReadTcpData(pFdObj, &recvInfo) < 0) { - shutdown(pFdObj->fd, SHUT_WR); + shutdown(pFdObj->fd, SHUT_WR); continue; } @@ -565,7 +570,7 @@ static void *taosProcessTcpData(void *param) { if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); } - if (pThreadObj->stop) break; + if (pThreadObj->stop) break; } if (pThreadObj->pollFd >=0) { @@ -603,7 +608,7 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { event.data.ptr = pFdObj; if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { tfree(pFdObj); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } @@ -637,7 +642,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pThreadObj->numOfFds--; if (pThreadObj->numOfFds < 0) - tError("%s %p TCP thread:%d, number of FDs is negative!!!", + tError("%s %p TCP thread:%d, number of FDs is negative!!!", pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); if (pFdObj->prev) { @@ -652,7 +657,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { pthread_mutex_unlock(&pThreadObj->mutex); - tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", + tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds); tfree(pFdObj); diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 22bdc7e74e..3ad9e9bba0 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -103,6 +103,11 @@ void syncCloseTcpThreadPool(void *param) { #ifdef WINDOWS closesocket(pPool->acceptFd); +#elif defined(__APPLE__) + if (pPool->acceptFd!=-1) { + close(pPool->acceptFd); + pPool->acceptFd = -1; + } #else shutdown(pPool->acceptFd, SHUT_RD); #endif diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 7ccdca0fb1..e075876867 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -32,14 +32,27 @@ int32_t taosGetFqdn(char *fqdn) { struct addrinfo hints = {0}; struct addrinfo *result = NULL; +#ifdef __APPLE__ + // on macosx, hostname -f has the form of xxx.local + // which will block getaddrinfo for a few seconds if AI_CANONNAME is set + // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return + // immediately + hints.ai_family = AF_INET; +#else // __APPLE__ hints.ai_flags = AI_CANONNAME; +#endif // __APPLE__ int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); return -1; } +#ifdef __APPLE__ + // refer to comments above + strcpy(fqdn, hostname); +#else // __APPLE__ strcpy(fqdn, result->ai_canonname); +#endif // __APPLE__ freeaddrinfo(result); return 0; } @@ -286,7 +299,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie int32_t bufSize = 1024 * 1024; sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - + if (sockFd <= 2) { uError("failed to open the socket: %d (%s)", errno, strerror(errno)); taosCloseSocketNoCheck(sockFd); diff --git a/tests/examples/c/epoll.c b/tests/examples/c/epoll.c index 20ab395158..de7c5989d1 100644 --- a/tests/examples/c/epoll.c +++ b/tests/examples/c/epoll.c @@ -38,6 +38,8 @@ #include #include #include +#include +#include #define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) #define A(statement, fmt, ...) do { \ @@ -56,6 +58,8 @@ ##__VA_ARGS__); \ } while (0) +#include "os.h" + typedef struct ep_s ep_t; struct ep_s { int ep; @@ -214,7 +218,6 @@ static void* routine(void* arg) { A(0==pthread_mutex_lock(&ep->lock), ""); ep->wakenup = 0; A(0==pthread_mutex_unlock(&ep->lock), ""); - D("........"); continue; } A(ev->data.ptr, "internal logic error"); -- GitLab