提交 3c82bdfa 编写于 作者: F freemine

first taosd on macosx

上级 aefc6052
...@@ -36,12 +36,12 @@ int32_t dnodeInitShell() { ...@@ -36,12 +36,12 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMWriteQueue;
...@@ -57,13 +57,13 @@ int32_t dnodeInitShell() { ...@@ -57,13 +57,13 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue;
// the following message shall be treated as mnode query // the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
...@@ -153,7 +153,7 @@ static int32_t dnodeAuthNettestUser(char *user, char *spi, char *encrypt, char * ...@@ -153,7 +153,7 @@ static int32_t dnodeAuthNettestUser(char *user, char *spi, char *encrypt, char *
} }
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0; if (dnodeAuthNettestUser(user, spi, encrypt, secret, ckey) == 0) return 0;
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code; if (code != TSDB_CODE_APP_NOT_READY) return code;
...@@ -164,7 +164,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -164,7 +164,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
rpcMsg.pCont = pMsg; rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SAuthMsg); rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH; rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
dDebug("user:%s, send auth msg to mnodes", user); dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
...@@ -202,14 +202,14 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) { ...@@ -202,14 +202,14 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid) {
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
terrno = rpcRsp.code; terrno = rpcRsp.code;
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
rpcFreeCont(rpcRsp.pCont); rpcFreeCont(rpcRsp.pCont);
dError("vgId:%d, tid:%d failed to config table from mnode", vgId, tid); dError("vgId:%d, tid:%d failed to config table from mnode", vgId, tid);
return NULL; return NULL;
} else { } else {
dInfo("vgId:%d, tid:%d config table msg is received", vgId, tid); dInfo("vgId:%d, tid:%d config table msg is received", vgId, tid);
// delete this after debug finished // delete this after debug finished
SMDCreateTableMsg *pTable = rpcRsp.pCont; SMDCreateTableMsg *pTable = rpcRsp.pCont;
int16_t numOfColumns = htons(pTable->numOfColumns); int16_t numOfColumns = htons(pTable->numOfColumns);
...@@ -231,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() { ...@@ -231,4 +231,4 @@ SStatisInfo dnodeGetStatisInfo() {
} }
return info; return info;
} }
\ No newline at end of file
...@@ -69,6 +69,6 @@ int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) { ...@@ -69,6 +69,6 @@ int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) {
return 0; return 0;
} }
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) { void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) {
taosStepCleanupImp(pSteps, stepSize - 1); taosStepCleanupImp(pSteps, stepSize - 1);
} }
\ No newline at end of file
...@@ -181,4 +181,4 @@ static void sigintHandler(int32_t signum, void *sigInfo, void *context) { ...@@ -181,4 +181,4 @@ static void sigintHandler(int32_t signum, void *sigInfo, void *context) {
#ifdef WINDOWS #ifdef WINDOWS
tsem_wait(&exitSem); tsem_wait(&exitSem);
#endif #endif
} }
\ No newline at end of file
...@@ -53,7 +53,7 @@ void dnodeCleanupVWrite() { ...@@ -53,7 +53,7 @@ void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SVWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (taosCheckPthreadValid(pWorker->thread)) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(pWorker->qset); if (pWorker->qset) taosQsetThreadResume(pWorker->qset);
} }
} }
......
...@@ -85,6 +85,7 @@ extern "C" { ...@@ -85,6 +85,7 @@ extern "C" {
#define TAOS_OS_FUNC_STRING_STR2INT64 #define TAOS_OS_FUNC_STRING_STR2INT64
#define TAOS_OS_FUNC_SYSINFO #define TAOS_OS_FUNC_SYSINFO
#define TAOS_OS_FUNC_TIMER #define TAOS_OS_FUNC_TIMER
#define TAOS_OS_FUNC_SEMPHONE_PTHREAD
// specific // specific
#define htobe64 htonll #define htobe64 htonll
...@@ -114,7 +115,7 @@ int64_t tsosStr2int64(char *str); ...@@ -114,7 +115,7 @@ int64_t tsosStr2int64(char *str);
void taos_block_sigalrm(void); void taos_block_sigalrm(void);
#define TAOS_OS_DEF_EPOLL #define TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME 500 #define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET; typedef int32_t SOCKET;
typedef SOCKET EpollFd; typedef SOCKET EpollFd;
#define EpollClose(pollFd) epoll_close(pollFd) #define EpollClose(pollFd) epoll_close(pollFd)
......
...@@ -19,15 +19,17 @@ ...@@ -19,15 +19,17 @@
void osInit() { void osInit() {
if (configDir[0] == 0) { if (configDir[0] == 0) {
strcpy(configDir, "~/TDengine/cfg"); strcpy(configDir, "/etc/taos");
} }
strcpy(tsVnodeDir, ""); strcpy(tsVnodeDir, "");
strcpy(tsDnodeDir, ""); strcpy(tsDnodeDir, "");
strcpy(tsMnodeDir, ""); strcpy(tsMnodeDir, "");
strcpy(tsDataDir, "~/TDengine/data");
strcpy(tsLogDir, "~/TDengine/log"); strcpy(tsDataDir, "/tmp/taosd/data");
strcpy(tsScriptDir, "~/TDengine/cfg"); strcpy(tsLogDir, "/tmp/taosd/log");
strcpy(tsScriptDir, "/etc/taos");
strcpy(tsOsName, "Darwin"); strcpy(tsOsName, "Darwin");
} }
...@@ -17,71 +17,49 @@ ...@@ -17,71 +17,49 @@
#include "os.h" #include "os.h"
#include "tulog.h" #include "tulog.h"
#define _SEND_FILE_STEP_ 1000
int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) { int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t count) {
fseek(in_file, (int32_t)(*offset), 0); int r = 0;
int writeLen = 0; if (offset) {
uint8_t buffer[_SEND_FILE_STEP_] = {0}; r = fseek(in_file, *offset, SEEK_SET);
if (r==-1) return -1;
for (int len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
size_t rlen = fread(buffer, 1, _SEND_FILE_STEP_, in_file);
if (rlen <= 0) {
return writeLen;
} else if (rlen < _SEND_FILE_STEP_) {
fwrite(buffer, 1, rlen, out_file);
return (int)(writeLen + rlen);
} else {
fwrite(buffer, 1, _SEND_FILE_STEP_, in_file);
writeLen += _SEND_FILE_STEP_;
}
} }
off_t len = count;
int remain = count - writeLen; while (len>0) {
if (remain > 0) { char buf[1024*16];
size_t rlen = fread(buffer, 1, remain, in_file); off_t n = sizeof(buf);
if (rlen <= 0) { if (len<n) n = len;
return writeLen; size_t m = fread(buf, 1, n, in_file);
} else { if (m<n) {
fwrite(buffer, 1, remain, out_file); int e = ferror(in_file);
writeLen += remain; if (e) return -1;
} }
if (m==0) break;
if (m!=fwrite(buf, 1, m, out_file)) {
return -1;
}
len -= m;
} }
return count - len;
return writeLen;
} }
int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t* offset, int64_t count) { int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t* offset, int64_t count) {
lseek(sfd, (int32_t)(*offset), 0); int r = 0;
int64_t writeLen = 0; if (offset) {
uint8_t buffer[_SEND_FILE_STEP_] = { 0 }; r = lseek(sfd, *offset, SEEK_SET);
if (r==-1) return -1;
for (int64_t len = 0; len < (count - _SEND_FILE_STEP_); len += _SEND_FILE_STEP_) {
int32_t rlen = (int32_t)read(sfd, buffer, _SEND_FILE_STEP_);
if (rlen <= 0) {
return writeLen;
}
else if (rlen < _SEND_FILE_STEP_) {
taosWriteSocket(dfd, buffer, rlen);
return (int64_t)(writeLen + rlen);
}
else {
taosWriteSocket(dfd, buffer, _SEND_FILE_STEP_);
writeLen += _SEND_FILE_STEP_;
}
} }
off_t len = count;
int64_t remain = count - writeLen; while (len>0) {
if (remain > 0) { char buf[1024*16];
int32_t rlen = read(sfd, buffer, (int32_t)remain); off_t n = sizeof(buf);
if (rlen <= 0) { if (len<n) n = len;
return writeLen; size_t m = read(sfd, buf, n);
} if (m==-1) return -1;
else { if (m==0) break;
taosWriteSocket(sfd, buffer, (int32_t)remain); size_t l = write(dfd, buf, m);
writeLen += remain; if (l==-1) return -1;
} len -= l;
} }
return count - len;
return writeLen;
} }
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include <libproc.h>
// #define SEM_USE_PTHREAD // #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX // #define SEM_USE_POSIX
#define SEM_USE_SEM #define SEM_USE_SEM
...@@ -279,3 +281,41 @@ int tsem_destroy(tsem_t *sem) { ...@@ -279,3 +281,41 @@ int tsem_destroy(tsem_t *sem) {
return 0; return 0;
} }
bool taosCheckPthreadValid(pthread_t thread) {
uint64_t id = 0;
int r = pthread_threadid_np(thread, &id);
return r ? false : true;
}
int64_t taosGetSelfPthreadId() {
return (int64_t)pthread_self();
}
int64_t taosGetPthreadId(pthread_t thread) {
return (int64_t)thread;
}
void taosResetPthread(pthread_t* thread) {
*thread = NULL;
}
bool taosComparePthread(pthread_t first, pthread_t second) {
return pthread_equal(first, second) ? true : false;
}
int32_t taosGetPId() {
return (int32_t)getpid();
}
int32_t taosGetCurrentAPPName(char* name, int32_t* len) {
char buf[PATH_MAX+1];
buf[0] = '\0';
proc_name(getpid(), buf, sizeof(buf)-1);
buf[PATH_MAX] = '\0';
size_t n = strlen(buf);
if (len) *len = n;
if (name) strcpy(name, buf);
return 0;
}
...@@ -24,42 +24,134 @@ ...@@ -24,42 +24,134 @@
static void taosGetSystemTimezone() { static void taosGetSystemTimezone() {
// get and set default timezone
SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone"); SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone");
if (cfg_timezone && cfg_timezone->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) { if (cfg_timezone == NULL) return;
char *tz = getenv("TZ"); if (cfg_timezone->cfgStatus >= TAOS_CFG_CSTATUS_DEFAULT) {
if (tz == NULL || strlen(tz) == 0) { return;
strcpy(tsTimezone, "not configured"); }
/* load time zone string from /etc/localtime */
char buf[4096];
char *tz = NULL; {
int n = readlink("/etc/localtime", buf, sizeof(buf));
if (n<0) {
uError("read /etc/localtime error, reason:%s", strerror(errno));
return;
}
buf[n] = '\0';
for(int i=n-1; i>=0; --i) {
if (buf[i]=='/') {
if (tz) {
tz = buf + i + 1;
break;
}
tz = buf + i + 1;
}
} }
else { if (!tz || 0==strchr(tz, '/')) {
strcpy(tsTimezone, tz); uError("parsing /etc/localtime failed");
return;
} }
cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT;
uInfo("timezone not configured, use default"); setenv("TZ", tz, 1);
tzset();
} }
/*
* NOTE: do not remove it.
* Enforce set the correct daylight saving time(DST) flag according
* to current time
*/
time_t tx1 = time(NULL);
struct tm tm1;
localtime_r(&tx1, &tm1);
/*
* format example:
*
* Asia/Shanghai (CST, +0800)
* Europe/London (BST, +0100)
*/
snprintf(tsTimezone, TSDB_TIMEZONE_LEN, "%s (%s, %+03ld00)",
tz, tm1.tm_isdst ? tzname[daylight] : tzname[0], -timezone/3600);
// cfg_timezone->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT;
uWarn("timezone not configured, set to system default:%s", tsTimezone);
} }
static void taosGetSystemLocale() { /*
// get and set default locale * originally from src/os/src/detail/osSysinfo.c
* POSIX format locale string:
* (Language Strings)_(Country/Region Strings).(code_page)
*
* example: en_US.UTF-8, zh_CN.GB18030, zh_CN.UTF-8,
*
* if user does not specify the locale in taos.cfg the program use default LC_CTYPE as system locale.
*
* In case of some CentOS systems, their default locale is "en_US.utf8", which is not valid code_page
* for libiconv that is employed to convert string in this system. This program will automatically use
* UTF-8 instead as the charset.
*
* In case of windows client, the locale string is not valid POSIX format, user needs to set the
* correct code_page for libiconv. Usually, the code_page of windows system with simple chinese is
* CP936, CP437 for English charset.
*
*/
static void taosGetSystemLocale() { // get and set default locale
char sep = '.';
char *locale = NULL;
SGlobalCfg *cfg_locale = taosGetConfigOption("locale"); SGlobalCfg *cfg_locale = taosGetConfigOption("locale");
if (cfg_locale && cfg_locale->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) { if (cfg_locale && cfg_locale->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) {
char *locale = setlocale(LC_CTYPE, "chs"); locale = setlocale(LC_CTYPE, "");
if (locale != NULL) { if (locale == NULL) {
uError("can't get locale from system, set it to en_US.UTF-8 since error:%d:%s", errno, strerror(errno));
strcpy(tsLocale, "en_US.UTF-8");
} else {
tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN); tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
cfg_locale->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; uWarn("locale not configured, set to system default:%s", tsLocale);
uInfo("locale not configured, set to default:%s", tsLocale);
} }
} }
/* if user does not specify the charset, extract it from locale */
SGlobalCfg *cfg_charset = taosGetConfigOption("charset"); SGlobalCfg *cfg_charset = taosGetConfigOption("charset");
if (cfg_charset && cfg_charset->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) { if (cfg_charset && cfg_charset->cfgStatus < TAOS_CFG_CSTATUS_DEFAULT) {
strcpy(tsCharset, "cp936"); char *str = strrchr(tsLocale, sep);
cfg_charset->cfgStatus = TAOS_CFG_CSTATUS_DEFAULT; if (str != NULL) {
uInfo("charset not configured, set to default:%s", tsCharset); str++;
char *revisedCharset = taosCharsetReplace(str);
tstrncpy(tsCharset, revisedCharset, TSDB_LOCALE_LEN);
free(revisedCharset);
uWarn("charset not configured, set to system default:%s", tsCharset);
} else {
strcpy(tsCharset, "UTF-8");
uWarn("can't get locale and charset from system, set it to UTF-8");
}
} }
} }
void taosPrintOsInfo() {} void taosPrintOsInfo() {
uInfo(" os pageSize: %" PRId64 "(KB)", tsPageSize / 1024);
// uInfo(" os openMax: %" PRId64, tsOpenMax);
// uInfo(" os streamMax: %" PRId64, tsStreamMax);
uInfo(" os numOfCores: %d", tsNumOfCores);
uInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB);
uInfo(" os totalMemory: %d(MB)", tsTotalMemoryMB);
struct utsname buf;
if (uname(&buf)) {
uInfo(" can't fetch os info");
return;
}
uInfo(" os sysname: %s", buf.sysname);
uInfo(" os nodename: %s", buf.nodename);
uInfo(" os release: %s", buf.release);
uInfo(" os version: %s", buf.version);
uInfo(" os machine: %s", buf.machine);
uInfo("==================================");
}
void taosKillSystem() { void taosKillSystem() {
uError("function taosKillSystem, exit!"); uError("function taosKillSystem, exit!");
...@@ -67,6 +159,22 @@ void taosKillSystem() { ...@@ -67,6 +159,22 @@ void taosKillSystem() {
} }
void taosGetSystemInfo() { void taosGetSystemInfo() {
// taosGetProcInfos();
tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN);
long physical_pages = sysconf(_SC_PHYS_PAGES);
long page_size = sysconf(_SC_PAGESIZE);
tsTotalMemoryMB = physical_pages * page_size / (1024 * 1024);
tsPageSize = page_size;
// float tmp1, tmp2;
// taosGetSysMemory(&tmp1);
// taosGetProcMemory(&tmp2);
// taosGetDisk();
// taosGetBandSpeed(&tmp1);
// taosGetCpuUsage(&tmp1, &tmp2);
// taosGetProcIO(&tmp1, &tmp2);
taosGetSystemTimezone(); taosGetSystemTimezone();
taosGetSystemLocale(); taosGetSystemLocale();
} }
...@@ -121,7 +229,6 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { ...@@ -121,7 +229,6 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
char cmdline[1024]; char cmdline[1024];
char *taosGetCmdlineByPID(int pid) { char *taosGetCmdlineByPID(int pid) {
errno = 0; errno = 0;
if (proc_pidpath(pid, cmdline, sizeof(cmdline)) <= 0) { if (proc_pidpath(pid, cmdline, sizeof(cmdline)) <= 0) {
...@@ -136,6 +243,7 @@ bool taosGetSystemUid(char *uid) { ...@@ -136,6 +243,7 @@ bool taosGetSystemUid(char *uid) {
uuid_t uuid = {0}; uuid_t uuid = {0};
uuid_generate(uuid); uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse(uuid, uid); uuid_unparse_lower(uuid, uid);
return true; return true;
} }
...@@ -62,4 +62,4 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) { ...@@ -62,4 +62,4 @@ int32_t taosGetCurrentAPPName(char *name, int32_t* len) {
return 0; return 0;
} }
#endif #endif
\ No newline at end of file
...@@ -92,6 +92,11 @@ void httpStopSystem() { ...@@ -92,6 +92,11 @@ void httpStopSystem() {
tsHttpServer.stop = 1; tsHttpServer.stop = 1;
#ifdef WINDOWS #ifdef WINDOWS
closesocket(tsHttpServer.fd); closesocket(tsHttpServer.fd);
#elif __APPLE__
if (tsHttpServer.fd!=-1) {
close(tsHttpServer.fd);
tsHttpServer.fd = -1;
}
#else #else
shutdown(tsHttpServer.fd, SHUT_RD); shutdown(tsHttpServer.fd, SHUT_RD);
#endif #endif
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "tsocket.h" #include "tsocket.h"
#include "tutil.h" #include "tutil.h"
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcTcp.h" #include "rpcTcp.h"
...@@ -81,7 +81,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -81,7 +81,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1); pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
if (pServerObj == NULL) { if (pServerObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
...@@ -95,7 +95,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -95,7 +95,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads); pServerObj->pThreadObj = (SThreadObj **)calloc(sizeof(SThreadObj *), numOfThreads);
if (pServerObj->pThreadObj == NULL) { if (pServerObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
} }
...@@ -105,18 +105,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -105,18 +105,18 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
// initialize parameters in case it may encounter error later // initialize parameters in case it may encounter error later
for (int i = 0; i < numOfThreads; ++i) { for (int i = 0; i < numOfThreads; ++i) {
pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1); pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), 1);
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]); for (int j=0; j<i; ++j) free(pServerObj->pThreadObj[j]);
free(pServerObj->pThreadObj); free(pServerObj->pThreadObj);
free(pServerObj); free(pServerObj);
return NULL; return NULL;
} }
pServerObj->pThreadObj[i] = pThreadObj; pServerObj->pThreadObj[i] = pThreadObj;
pThreadObj->pollFd = -1; pThreadObj->pollFd = -1;
taosResetPthread(&pThreadObj->thread); taosResetPthread(&pThreadObj->thread);
...@@ -152,9 +152,9 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -152,9 +152,9 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (pServerObj->fd < 0) code = -1; if (pServerObj->fd < 0) code = -1;
if (code == 0) { if (code == 0) {
code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj); code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
if (code != 0) { if (code != 0) {
tError("%s failed to create TCP accept thread(%s)", label, strerror(code)); tError("%s failed to create TCP accept thread(%s)", label, strerror(code));
...@@ -162,7 +162,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -162,7 +162,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpTcpServer(pServerObj); taosCleanUpTcpServer(pServerObj);
pServerObj = NULL; pServerObj = NULL;
} else { } else {
...@@ -175,8 +175,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -175,8 +175,8 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
static void taosStopTcpThread(SThreadObj* pThreadObj) { static void taosStopTcpThread(SThreadObj* pThreadObj) {
if (pThreadObj == NULL) { return;} if (pThreadObj == NULL) { return;}
// save thread into local variable and signal thread to stop // save thread into local variable and signal thread to stop
pthread_t thread = pThreadObj->thread; pthread_t thread = pThreadObj->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
return; return;
} }
...@@ -197,6 +197,11 @@ void taosStopTcpServer(void *handle) { ...@@ -197,6 +197,11 @@ void taosStopTcpServer(void *handle) {
if (pServerObj->fd >= 0) { if (pServerObj->fd >= 0) {
#ifdef WINDOWS #ifdef WINDOWS
closesocket(pServerObj->fd); closesocket(pServerObj->fd);
#elif defined(__APPLE__)
if (pServerObj->fd!=-1) {
close(pServerObj->fd);
pServerObj->fd = -1;
}
#else #else
shutdown(pServerObj->fd, SHUT_RD); shutdown(pServerObj->fd, SHUT_RD);
#endif #endif
...@@ -265,7 +270,7 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -265,7 +270,7 @@ static void *taosAcceptTcpConnection(void *arg) {
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
continue; continue;
} }
// pick up the thread to handle this connection // pick up the thread to handle this connection
pThreadObj = pServerObj->pThreadObj[threadId]; pThreadObj = pServerObj->pThreadObj[threadId];
...@@ -274,13 +279,13 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -274,13 +279,13 @@ static void *taosAcceptTcpConnection(void *arg) {
if (pFdObj) { if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port); pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else { } else {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
} }
// pick up next thread for next connection // pick up next thread for next connection
threadId++; threadId++;
...@@ -295,17 +300,17 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -295,17 +300,17 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
SClientObj *pClientObj = (SClientObj *)calloc(1, sizeof(SClientObj)); SClientObj *pClientObj = (SClientObj *)calloc(1, sizeof(SClientObj));
if (pClientObj == NULL) { if (pClientObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
tstrncpy(pClientObj->label, label, sizeof(pClientObj->label)); tstrncpy(pClientObj->label, label, sizeof(pClientObj->label));
pClientObj->numOfThreads = numOfThreads; pClientObj->numOfThreads = numOfThreads;
pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*)); pClientObj->pThreadObj = (SThreadObj **)calloc(numOfThreads, sizeof(SThreadObj*));
if (pClientObj->pThreadObj == NULL) { if (pClientObj->pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
tfree(pClientObj); tfree(pClientObj);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} }
int code = 0; int code = 0;
...@@ -317,7 +322,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -317,7 +322,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
SThreadObj *pThreadObj = (SThreadObj *)calloc(1, sizeof(SThreadObj)); SThreadObj *pThreadObj = (SThreadObj *)calloc(1, sizeof(SThreadObj));
if (pThreadObj == NULL) { if (pThreadObj == NULL) {
tError("TCP:%s no enough memory", label); tError("TCP:%s no enough memory", label);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
for (int j=0; j<i; ++j) free(pClientObj->pThreadObj[j]); for (int j=0; j<i; ++j) free(pClientObj->pThreadObj[j]);
free(pClientObj); free(pClientObj);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
...@@ -356,11 +361,11 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -356,11 +361,11 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj->threadId = i; pThreadObj->threadId = i;
} }
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
taosCleanUpTcpClient(pClientObj); taosCleanUpTcpClient(pClientObj);
pClientObj = NULL; pClientObj = NULL;
} }
return pClientObj; return pClientObj;
} }
void taosStopTcpClient(void *chandle) { void taosStopTcpClient(void *chandle) {
...@@ -374,20 +379,20 @@ void taosStopTcpClient(void *chandle) { ...@@ -374,20 +379,20 @@ void taosStopTcpClient(void *chandle) {
void taosCleanUpTcpClient(void *chandle) { void taosCleanUpTcpClient(void *chandle) {
SClientObj *pClientObj = chandle; SClientObj *pClientObj = chandle;
if (pClientObj == NULL) return; if (pClientObj == NULL) return;
for (int i = 0; i < pClientObj->numOfThreads; ++i) { for (int i = 0; i < pClientObj->numOfThreads; ++i) {
SThreadObj *pThreadObj= pClientObj->pThreadObj[i]; SThreadObj *pThreadObj= pClientObj->pThreadObj[i];
taosStopTcpThread(pThreadObj); taosStopTcpThread(pThreadObj);
} }
tDebug("%s TCP client is cleaned up", pClientObj->label); tDebug("%s TCP client is cleaned up", pClientObj->label);
tfree(pClientObj->pThreadObj); tfree(pClientObj->pThreadObj);
tfree(pClientObj); tfree(pClientObj);
} }
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SClientObj * pClientObj = shandle; SClientObj * pClientObj = shandle;
int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads; int32_t index = atomic_load_32(&pClientObj->index) % pClientObj->numOfThreads;
atomic_store_32(&pClientObj->index, index + 1); atomic_store_32(&pClientObj->index, index + 1);
SThreadObj *pThreadObj = pClientObj->pThreadObj[index]; SThreadObj *pThreadObj = pClientObj->pThreadObj[index];
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
...@@ -402,12 +407,12 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -402,12 +407,12 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
} }
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
if (pFdObj) { if (pFdObj) {
pFdObj->thandle = thandle; pFdObj->thandle = thandle;
pFdObj->port = port; pFdObj->port = port;
pFdObj->ip = ip; pFdObj->ip = ip;
tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d", tDebug("%s %p TCP connection to 0x%x:%hu is created, localPort:%hu FD:%p numOfFds:%d",
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
...@@ -422,7 +427,7 @@ void taosCloseTcpConnection(void *chandle) { ...@@ -422,7 +427,7 @@ void taosCloseTcpConnection(void *chandle) {
if (pFdObj == NULL || pFdObj->signature != pFdObj) return; if (pFdObj == NULL || pFdObj->signature != pFdObj) return;
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj); tDebug("%s %p TCP connection will be closed, FD:%p", pThreadObj->label, pFdObj->thandle, pFdObj);
// pFdObj->thandle = NULL; // pFdObj->thandle = NULL;
pFdObj->closedByApp = 1; pFdObj->closedByApp = 1;
...@@ -435,7 +440,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand ...@@ -435,7 +440,7 @@ int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chand
SThreadObj *pThreadObj = pFdObj->pThreadObj; SThreadObj *pThreadObj = pFdObj->pThreadObj;
int ret = taosWriteMsg(pFdObj->fd, data, len); int ret = taosWriteMsg(pFdObj->fd, data, len);
tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret); tTrace("%s %p TCP data is sent, FD:%p fd:%d bytes:%d", pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, ret);
return ret; return ret;
} }
...@@ -458,7 +463,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) { ...@@ -458,7 +463,7 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
recvInfo.chandle = NULL; recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP; recvInfo.connType = RPC_CONN_TCP;
(*(pThreadObj->processData))(&recvInfo); (*(pThreadObj->processData))(&recvInfo);
} }
taosFreeFdObj(pFdObj); taosFreeFdObj(pFdObj);
} }
...@@ -473,7 +478,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -473,7 +478,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead)); headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
if (headLen != sizeof(SRpcHead)) { if (headLen != sizeof(SRpcHead)) {
tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen); tDebug("%s %p read error, FD:%p headLen:%d", pThreadObj->label, pFdObj->thandle, pFdObj, headLen);
return -1; return -1;
} }
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
...@@ -491,14 +496,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -491,14 +496,14 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen); retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
if (leftLen != retLen) { if (leftLen != retLen) {
tError("%s %p read error, leftLen:%d retLen:%d FD:%p", tError("%s %p read error, leftLen:%d retLen:%d FD:%p",
pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj); pThreadObj->label, pFdObj->thandle, leftLen, retLen, pFdObj);
free(buffer); free(buffer);
return -1; return -1;
} }
memcpy(msg, &rpcHead, sizeof(SRpcHead)); memcpy(msg, &rpcHead, sizeof(SRpcHead));
pInfo->msg = msg; pInfo->msg = msg;
pInfo->msgLen = msgLen; pInfo->msgLen = msgLen;
pInfo->ip = pFdObj->ip; pInfo->ip = pFdObj->ip;
...@@ -509,7 +514,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -509,7 +514,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
pInfo->connType = RPC_CONN_TCP; pInfo->connType = RPC_CONN_TCP;
if (pFdObj->closedByApp) { if (pFdObj->closedByApp) {
free(buffer); free(buffer);
return -1; return -1;
} }
...@@ -557,7 +562,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -557,7 +562,7 @@ static void *taosProcessTcpData(void *param) {
} }
if (taosReadTcpData(pFdObj, &recvInfo) < 0) { if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
continue; continue;
} }
...@@ -565,7 +570,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -565,7 +570,7 @@ static void *taosProcessTcpData(void *param) {
if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj); if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
} }
if (pThreadObj->stop) break; if (pThreadObj->stop) break;
} }
if (pThreadObj->pollFd >=0) { if (pThreadObj->pollFd >=0) {
...@@ -603,7 +608,7 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { ...@@ -603,7 +608,7 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
event.data.ptr = pFdObj; event.data.ptr = pFdObj;
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tfree(pFdObj); tfree(pFdObj);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
...@@ -637,7 +642,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -637,7 +642,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pThreadObj->numOfFds--; pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0) if (pThreadObj->numOfFds < 0)
tError("%s %p TCP thread:%d, number of FDs is negative!!!", tError("%s %p TCP thread:%d, number of FDs is negative!!!",
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId); pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
if (pFdObj->prev) { if (pFdObj->prev) {
...@@ -652,7 +657,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -652,7 +657,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->mutex); pthread_mutex_unlock(&pThreadObj->mutex);
tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d", tDebug("%s %p TCP connection is closed, FD:%p fd:%d numOfFds:%d",
pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds); pThreadObj->label, pFdObj->thandle, pFdObj, pFdObj->fd, pThreadObj->numOfFds);
tfree(pFdObj); tfree(pFdObj);
......
...@@ -103,6 +103,11 @@ void syncCloseTcpThreadPool(void *param) { ...@@ -103,6 +103,11 @@ void syncCloseTcpThreadPool(void *param) {
#ifdef WINDOWS #ifdef WINDOWS
closesocket(pPool->acceptFd); closesocket(pPool->acceptFd);
#elif defined(__APPLE__)
if (pPool->acceptFd!=-1) {
close(pPool->acceptFd);
pPool->acceptFd = -1;
}
#else #else
shutdown(pPool->acceptFd, SHUT_RD); shutdown(pPool->acceptFd, SHUT_RD);
#endif #endif
......
...@@ -32,14 +32,27 @@ int32_t taosGetFqdn(char *fqdn) { ...@@ -32,14 +32,27 @@ int32_t taosGetFqdn(char *fqdn) {
struct addrinfo hints = {0}; struct addrinfo hints = {0};
struct addrinfo *result = NULL; struct addrinfo *result = NULL;
#ifdef __APPLE__
// on macosx, hostname -f has the form of xxx.local
// which will block getaddrinfo for a few seconds if AI_CANONNAME is set
// thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
// immediately
hints.ai_family = AF_INET;
#else // __APPLE__
hints.ai_flags = AI_CANONNAME; hints.ai_flags = AI_CANONNAME;
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) { if (!result) {
uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1; return -1;
} }
#ifdef __APPLE__
// refer to comments above
strcpy(fqdn, hostname);
#else // __APPLE__
strcpy(fqdn, result->ai_canonname); strcpy(fqdn, result->ai_canonname);
#endif // __APPLE__
freeaddrinfo(result); freeaddrinfo(result);
return 0; return 0;
} }
...@@ -286,7 +299,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie ...@@ -286,7 +299,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
int32_t bufSize = 1024 * 1024; int32_t bufSize = 1024 * 1024;
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd <= 2) { if (sockFd <= 2) {
uError("failed to open the socket: %d (%s)", errno, strerror(errno)); uError("failed to open the socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd); taosCloseSocketNoCheck(sockFd);
......
...@@ -38,6 +38,8 @@ ...@@ -38,6 +38,8 @@
#include <string.h> #include <string.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <libgen.h> #include <libgen.h>
#include <locale.h>
#include <netdb.h>
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) #define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#define A(statement, fmt, ...) do { \ #define A(statement, fmt, ...) do { \
...@@ -56,6 +58,8 @@ ...@@ -56,6 +58,8 @@
##__VA_ARGS__); \ ##__VA_ARGS__); \
} while (0) } while (0)
#include "os.h"
typedef struct ep_s ep_t; typedef struct ep_s ep_t;
struct ep_s { struct ep_s {
int ep; int ep;
...@@ -214,7 +218,6 @@ static void* routine(void* arg) { ...@@ -214,7 +218,6 @@ static void* routine(void* arg) {
A(0==pthread_mutex_lock(&ep->lock), ""); A(0==pthread_mutex_lock(&ep->lock), "");
ep->wakenup = 0; ep->wakenup = 0;
A(0==pthread_mutex_unlock(&ep->lock), ""); A(0==pthread_mutex_unlock(&ep->lock), "");
D("........");
continue; continue;
} }
A(ev->data.ptr, "internal logic error"); A(ev->data.ptr, "internal logic error");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册