提交 ac49a78d 编写于 作者: S Shengliang Guan

TD-1207

上级 f4539411
...@@ -13,7 +13,7 @@ ENDIF () ...@@ -13,7 +13,7 @@ ENDIF ()
SET(TD_ACCOUNT FALSE) SET(TD_ACCOUNT FALSE)
SET(TD_ADMIN FALSE) SET(TD_ADMIN FALSE)
SET(TD_GRANT FALSE) SET(TD_GRANT FALSE)
SET(TD_MQTT TRUE) SET(TD_MQTT FALSE)
SET(TD_TSDB_PLUGINS FALSE) SET(TD_TSDB_PLUGINS FALSE)
SET(TD_COVER FALSE) SET(TD_COVER FALSE)
...@@ -29,6 +29,11 @@ MESSAGE(STATUS "Community directory: " ${TD_COMMUNITY_DIR}) ...@@ -29,6 +29,11 @@ MESSAGE(STATUS "Community directory: " ${TD_COMMUNITY_DIR})
INCLUDE(cmake/input.inc) INCLUDE(cmake/input.inc)
INCLUDE(cmake/platform.inc) INCLUDE(cmake/platform.inc)
IF (TD_WINDOWS)
SET(TD_SOMODE_STATIC TRUE)
ENDIF ()
INCLUDE(cmake/define.inc) INCLUDE(cmake/define.inc)
INCLUDE(cmake/env.inc) INCLUDE(cmake/env.inc)
INCLUDE(cmake/version.inc) INCLUDE(cmake/version.inc)
......
...@@ -8,6 +8,6 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) ...@@ -8,6 +8,6 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_LIBRARY(balance ${SRC}) ADD_LIBRARY(balance ${SRC})
ENDIF () ENDIF ()
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
int32_t bnInitThread(); int32_t bnInitThread();
void bnCleanupThread(); void bnCleanupThread();
void bnNotify(); void bnNotify();
void bnStartTimer(int64_t mseconds); void bnStartTimer(int32_t mseconds);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
extern int64_t tsDnodeRid; extern int64_t tsDnodeRid;
extern int64_t tsSdbRid; extern int32_t tsSdbRid;
static SBnMgmt tsBnMgmt; static SBnMgmt tsBnMgmt;
static void bnMonitorDnodeModule(); static void bnMonitorDnodeModule();
......
...@@ -271,23 +271,23 @@ static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void ...@@ -271,23 +271,23 @@ static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = systemScore; *(float *)pWrite = (float)systemScore;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = pDnode->customScore; *(float *)pWrite = (float)pDnode->customScore;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)moduleScore; *(float *)pWrite = (float)moduleScore;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)vnodeScore; *(float *)pWrite = (float)vnodeScore;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore); *(float *)pWrite = (float)(vnodeScore + moduleScore + pDnode->customScore + systemScore);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -119,7 +119,7 @@ static void bnProcessTimer(void *handle, void *tmrId) { ...@@ -119,7 +119,7 @@ static void bnProcessTimer(void *handle, void *tmrId) {
} }
} }
void bnStartTimer(int64_t mseconds) { void bnStartTimer(int32_t mseconds) {
if (tsBnThread.stop) return; if (tsBnThread.stop) return;
bool updateSoon = (mseconds != -1); bool updateSoon = (mseconds != -1);
......
...@@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) ...@@ -6,7 +6,7 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_LIBRARY(tcq ${SRC}) ADD_LIBRARY(tcq ${SRC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(tcq tutil common taos_static) TARGET_LINK_LIBRARIES(tcq tutil common taos_static)
......
...@@ -343,7 +343,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -343,7 +343,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
char buf[TSDB_MAX_NCHAR_LEN]; char buf[TSDB_MAX_NCHAR_LEN];
int32_t len = taos_fetch_lengths(tres)[i]; int32_t len = taos_fetch_lengths(tres)[i];
taosMbsToUcs4(val, len, buf, sizeof(buf), &len); taosMbsToUcs4(val, len, buf, sizeof(buf), &len);
memcpy(val + sizeof(VarDataLenT), buf, len); memcpy((char *)val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len; varDataLen(val) = len;
} }
tdAppendColVal(trow, val, c->type, c->bytes, c->offset); tdAppendColVal(trow, val, c->type, c->bytes, c->offset);
......
...@@ -3,4 +3,4 @@ PROJECT(TDengine) ...@@ -3,4 +3,4 @@ PROJECT(TDengine)
LIST(APPEND CQTEST_SRC ./cqtest.c) LIST(APPEND CQTEST_SRC ./cqtest.c)
ADD_EXECUTABLE(cqtest ${CQTEST_SRC}) ADD_EXECUTABLE(cqtest ${CQTEST_SRC})
TARGET_LINK_LIBRARIES(cqtest tcq) TARGET_LINK_LIBRARIES(cqtest tcq taos_static)
...@@ -10,7 +10,7 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) ...@@ -10,7 +10,7 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync) TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync)
...@@ -28,7 +28,7 @@ IF (TD_LINUX) ...@@ -28,7 +28,7 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES(taosd grant) TARGET_LINK_LIBRARIES(taosd grant)
ENDIF () ENDIF ()
IF (TD_MQTT) IF (TD_LINUX AND TD_MQTT)
TARGET_LINK_LIBRARIES(taosd mqtt) TARGET_LINK_LIBRARIES(taosd mqtt)
ENDIF () ENDIF ()
......
...@@ -97,7 +97,7 @@ static int32_t dnodeReadCfg() { ...@@ -97,7 +97,7 @@ static int32_t dnodeReadCfg() {
goto PARSE_CFG_OVER; goto PARSE_CFG_OVER;
} }
len = fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s, content is null", file); dError("failed to read %s, content is null", file);
goto PARSE_CFG_OVER; goto PARSE_CFG_OVER;
...@@ -115,7 +115,7 @@ static int32_t dnodeReadCfg() { ...@@ -115,7 +115,7 @@ static int32_t dnodeReadCfg() {
dError("failed to read %s, dnodeId not found", file); dError("failed to read %s, dnodeId not found", file);
goto PARSE_CFG_OVER; goto PARSE_CFG_OVER;
} }
cfg.dnodeId = dnodeId->valueint; cfg.dnodeId = (int32_t)dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
......
...@@ -29,8 +29,8 @@ typedef struct { ...@@ -29,8 +29,8 @@ typedef struct {
static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}}; static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}};
int64_t tsMinFreeMemSizeForStart = 0; int64_t tsMinFreeMemSizeForStart = 0;
static int bindTcpPort(int port) { static int32_t bindTcpPort(int32_t port) {
int serverSocket; SOCKET serverSocket;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
...@@ -45,22 +45,22 @@ static int bindTcpPort(int port) { ...@@ -45,22 +45,22 @@ static int bindTcpPort(int port) {
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
dError("port:%d tcp bind() fail: %s", port, strerror(errno)); dError("port:%d tcp bind() fail: %s", port, strerror(errno));
close(serverSocket); taosCloseSocket(serverSocket);
return -1; return -1;
} }
if (listen(serverSocket, 5) < 0) { if (listen(serverSocket, 5) < 0) {
dError("port:%d listen() fail: %s", port, strerror(errno)); dError("port:%d listen() fail: %s", port, strerror(errno));
close(serverSocket); taosCloseSocket(serverSocket);
return -1; return -1;
} }
close(serverSocket); taosCloseSocket(serverSocket);
return 0; return 0;
} }
static int bindUdpPort(int port) { static int32_t bindUdpPort(int32_t port) {
int serverSocket; SOCKET serverSocket;
struct sockaddr_in server_addr; struct sockaddr_in server_addr;
if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
...@@ -75,19 +75,19 @@ static int bindUdpPort(int port) { ...@@ -75,19 +75,19 @@ static int bindUdpPort(int port) {
if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) { if (bind(serverSocket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
dError("port:%d udp bind() fail: %s", port, strerror(errno)); dError("port:%d udp bind() fail: %s", port, strerror(errno));
close(serverSocket); taosCloseSocket(serverSocket);
return -1; return -1;
} }
close(serverSocket); taosCloseSocket(serverSocket);
return 0; return 0;
} }
static int dnodeCheckNetwork() { static int32_t dnodeCheckNetwork() {
int ret; int32_t ret;
int startPort = tsServerPort; int32_t startPort = tsServerPort;
for (int port = startPort; port < startPort + 12; port++) { for (int32_t port = startPort; port < startPort + 12; port++) {
ret = bindTcpPort(port); ret = bindTcpPort(port);
if (0 != ret) { if (0 != ret) {
dError("failed to tcp bind port %d, quit", port); dError("failed to tcp bind port %d, quit", port);
...@@ -103,7 +103,7 @@ static int dnodeCheckNetwork() { ...@@ -103,7 +103,7 @@ static int dnodeCheckNetwork() {
return 0; return 0;
} }
static int dnodeCheckMem() { static int32_t dnodeCheckMem() {
float memoryUsedMB; float memoryUsedMB;
float memoryAvailMB; float memoryAvailMB;
if (true != taosGetSysMemory(&memoryUsedMB)) { if (true != taosGetSysMemory(&memoryUsedMB)) {
...@@ -121,12 +121,12 @@ static int dnodeCheckMem() { ...@@ -121,12 +121,12 @@ static int dnodeCheckMem() {
return 0; return 0;
} }
static int dnodeCheckCpu() { static int32_t dnodeCheckCpu() {
// TODO: // TODO:
return 0; return 0;
} }
static int dnodeCheckDisk() { static int32_t dnodeCheckDisk() {
taosGetDisk(); taosGetDisk();
if (tsAvailDataDirGB < tsMinimalDataDirGB) { if (tsAvailDataDirGB < tsMinimalDataDirGB) {
...@@ -147,24 +147,24 @@ static int dnodeCheckDisk() { ...@@ -147,24 +147,24 @@ static int dnodeCheckDisk() {
return 0; return 0;
} }
static int dnodeCheckOs() { static int32_t dnodeCheckOs() {
// TODO: // TODO:
return 0; return 0;
} }
static int dnodeCheckAccess() { static int32_t dnodeCheckAccess() {
// TODO: // TODO:
return 0; return 0;
} }
static int dnodeCheckVersion() { static int32_t dnodeCheckVersion() {
// TODO: // TODO:
return 0; return 0;
} }
static int dnodeCheckDatafile() { static int32_t dnodeCheckDatafile() {
// TODO: // TODO:
return 0; return 0;
......
...@@ -152,7 +152,7 @@ static int32_t dnodeReadEps() { ...@@ -152,7 +152,7 @@ static int32_t dnodeReadEps() {
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
len = fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s, content is null", file); dError("failed to read %s, content is null", file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
...@@ -199,7 +199,7 @@ static int32_t dnodeReadEps() { ...@@ -199,7 +199,7 @@ static int32_t dnodeReadEps() {
dError("failed to read %s, dnodeId not found", file); dError("failed to read %s, dnodeId not found", file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
ep->dnodeId = dnodeId->valueint; ep->dnodeId = (int32_t)dnodeId->valueint;
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
......
...@@ -80,7 +80,7 @@ void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) { ...@@ -80,7 +80,7 @@ void dnodeUpdateEpSetForPeer(SRpcEpSet *ep) {
pthread_mutex_lock(&tsMInfosMutex); pthread_mutex_lock(&tsMInfosMutex);
dInfo("minfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); dInfo("minfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
for (int i = 0; i < ep->numOfEps; ++i) { for (int32_t i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE; ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]); dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
} }
...@@ -108,7 +108,7 @@ void dnodeGetMInfos(SMInfos *pMinfos) { ...@@ -108,7 +108,7 @@ void dnodeGetMInfos(SMInfos *pMinfos) {
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsMInfosMutex); pthread_mutex_lock(&tsMInfosMutex);
*epSet = tsMEpSet; *epSet = tsMEpSet;
for (int i = 0; i < epSet->numOfEps; ++i) { for (int32_t i = 0; i < epSet->numOfEps; ++i) {
epSet->port[i] += TSDB_PORT_DNODEDNODE; epSet->port[i] += TSDB_PORT_DNODEDNODE;
} }
pthread_mutex_unlock(&tsMInfosMutex); pthread_mutex_unlock(&tsMInfosMutex);
...@@ -171,7 +171,7 @@ static int32_t dnodeReadMInfos() { ...@@ -171,7 +171,7 @@ static int32_t dnodeReadMInfos() {
goto PARSE_MINFOS_OVER; goto PARSE_MINFOS_OVER;
} }
len = fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s, content is null", file); dError("failed to read %s, content is null", file);
goto PARSE_MINFOS_OVER; goto PARSE_MINFOS_OVER;
...@@ -189,14 +189,14 @@ static int32_t dnodeReadMInfos() { ...@@ -189,14 +189,14 @@ static int32_t dnodeReadMInfos() {
dError("failed to read mnodeEpSet.json, inUse not found"); dError("failed to read mnodeEpSet.json, inUse not found");
goto PARSE_MINFOS_OVER; goto PARSE_MINFOS_OVER;
} }
tsMInfos.inUse = inUse->valueint; tsMInfos.inUse = (int8_t)inUse->valueint;
cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum"); cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) { if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnodeEpSet.json, nodeNum not found"); dError("failed to read mnodeEpSet.json, nodeNum not found");
goto PARSE_MINFOS_OVER; goto PARSE_MINFOS_OVER;
} }
minfos.mnodeNum = nodeNum->valueint; minfos.mnodeNum = (int8_t)nodeNum->valueint;
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) { if (!nodeInfos || nodeInfos->type != cJSON_Array) {
...@@ -204,13 +204,13 @@ static int32_t dnodeReadMInfos() { ...@@ -204,13 +204,13 @@ static int32_t dnodeReadMInfos() {
goto PARSE_MINFOS_OVER; goto PARSE_MINFOS_OVER;
} }
int size = cJSON_GetArraySize(nodeInfos); int32_t size = cJSON_GetArraySize(nodeInfos);
if (size != minfos.mnodeNum) { if (size != minfos.mnodeNum) {
dError("failed to read mnodeEpSet.json, nodeInfos size not matched"); dError("failed to read mnodeEpSet.json, nodeInfos size not matched");
goto PARSE_MINFOS_OVER; goto PARSE_MINFOS_OVER;
} }
for (int i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue; if (nodeInfo == NULL) continue;
...@@ -227,7 +227,7 @@ static int32_t dnodeReadMInfos() { ...@@ -227,7 +227,7 @@ static int32_t dnodeReadMInfos() {
} }
SMInfo *pMinfo = &minfos.mnodeInfos[i]; SMInfo *pMinfo = &minfos.mnodeInfos[i];
pMinfo->mnodeId = nodeId->valueint; pMinfo->mnodeId = (int32_t)nodeId->valueint;
tstrncpy(pMinfo->mnodeEp, nodeEp->valuestring, TSDB_EP_LEN); tstrncpy(pMinfo->mnodeEp, nodeEp->valuestring, TSDB_EP_LEN);
bool changed = dnodeCheckEpChanged(pMinfo->mnodeId, pMinfo->mnodeEp); bool changed = dnodeCheckEpChanged(pMinfo->mnodeId, pMinfo->mnodeEp);
......
...@@ -60,7 +60,7 @@ int32_t dnodeInitMPeer() { ...@@ -60,7 +60,7 @@ int32_t dnodeInitMPeer() {
void dnodeCleanupMPeer() { void dnodeCleanupMPeer() {
for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) { for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerWP.worker + i; SMPeerWorker *pWorker = tsMPeerWP.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsMPeerQset); taosQsetThreadResume(tsMPeerQset);
} }
dDebug("dnode mpeer worker:%d is closed", i); dDebug("dnode mpeer worker:%d is closed", i);
...@@ -69,7 +69,7 @@ void dnodeCleanupMPeer() { ...@@ -69,7 +69,7 @@ void dnodeCleanupMPeer() {
for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) { for (int32_t i = 0; i < tsMPeerWP.maxNum; ++i) {
SMPeerWorker *pWorker = tsMPeerWP.worker + i; SMPeerWorker *pWorker = tsMPeerWP.worker + i;
dDebug("dnode mpeer worker:%d start to join", i); dDebug("dnode mpeer worker:%d start to join", i);
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
dDebug("dnode mpeer worker:%d join success", i); dDebug("dnode mpeer worker:%d join success", i);
......
...@@ -40,7 +40,7 @@ static void *dnodeProcessMReadQueue(void *param); ...@@ -40,7 +40,7 @@ static void *dnodeProcessMReadQueue(void *param);
int32_t dnodeInitMRead() { int32_t dnodeInitMRead() {
tsMReadQset = taosOpenQset(); tsMReadQset = taosOpenQset();
tsMReadWP.maxNum = tsNumOfCores * tsNumOfThreadsPerCore / 2; tsMReadWP.maxNum = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2);
tsMReadWP.maxNum = MAX(2, tsMReadWP.maxNum); tsMReadWP.maxNum = MAX(2, tsMReadWP.maxNum);
tsMReadWP.maxNum = MIN(4, tsMReadWP.maxNum); tsMReadWP.maxNum = MIN(4, tsMReadWP.maxNum);
tsMReadWP.curNum = 0; tsMReadWP.curNum = 0;
...@@ -60,7 +60,7 @@ int32_t dnodeInitMRead() { ...@@ -60,7 +60,7 @@ int32_t dnodeInitMRead() {
void dnodeCleanupMRead() { void dnodeCleanupMRead() {
for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) { for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadWP.worker + i; SMReadWorker *pWorker = tsMReadWP.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsMReadQset); taosQsetThreadResume(tsMReadQset);
} }
dDebug("dnode mread worker:%d is closed", i); dDebug("dnode mread worker:%d is closed", i);
...@@ -69,7 +69,7 @@ void dnodeCleanupMRead() { ...@@ -69,7 +69,7 @@ void dnodeCleanupMRead() {
for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) { for (int32_t i = 0; i < tsMReadWP.maxNum; ++i) {
SMReadWorker *pWorker = tsMReadWP.worker + i; SMReadWorker *pWorker = tsMReadWP.worker + i;
dDebug("dnode mread worker:%d start to join", i); dDebug("dnode mread worker:%d start to join", i);
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
dDebug("dnode mread worker:%d start to join", i); dDebug("dnode mread worker:%d start to join", i);
......
...@@ -60,7 +60,7 @@ int32_t dnodeInitMWrite() { ...@@ -60,7 +60,7 @@ int32_t dnodeInitMWrite() {
void dnodeCleanupMWrite() { void dnodeCleanupMWrite() {
for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) { for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWriteWP.worker + i; SMWriteWorker *pWorker = tsMWriteWP.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsMWriteQset); taosQsetThreadResume(tsMWriteQset);
} }
dDebug("dnode mwrite worker:%d is closed", i); dDebug("dnode mwrite worker:%d is closed", i);
...@@ -69,7 +69,7 @@ void dnodeCleanupMWrite() { ...@@ -69,7 +69,7 @@ void dnodeCleanupMWrite() {
for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) { for (int32_t i = 0; i < tsMWriteWP.maxNum; ++i) {
SMWriteWorker *pWorker = tsMWriteWP.worker + i; SMWriteWorker *pWorker = tsMWriteWP.worker + i;
dDebug("dnode mwrite worker:%d start to join", i); dDebug("dnode mwrite worker:%d start to join", i);
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
dDebug("dnode mwrite worker:%d join success", i); dDebug("dnode mwrite worker:%d join success", i);
......
...@@ -90,7 +90,10 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -90,7 +90,10 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}; };
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) return dnodeSendStartupStep(pMsg); if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dnodeSendStartupStep(pMsg);
return;
}
if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_RUN_STATUS_RUNING) {
rspMsg.code = TSDB_CODE_APP_NOT_READY; rspMsg.code = TSDB_CODE_APP_NOT_READY;
......
...@@ -70,7 +70,7 @@ int32_t dnodeInitShell() { ...@@ -70,7 +70,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
int32_t numOfThreads = (tsNumOfCores * tsNumOfThreadsPerCore) / 2.0; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
} }
......
...@@ -70,5 +70,5 @@ int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) { ...@@ -70,5 +70,5 @@ int32_t dnodeStepInit(SStep *pSteps, int32_t stepSize) {
} }
void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) { void dnodeStepCleanup(SStep *pSteps, int32_t stepSize) {
return taosStepCleanupImp(pSteps, stepSize - 1); taosStepCleanupImp(pSteps, stepSize - 1);
} }
\ No newline at end of file
...@@ -19,9 +19,43 @@ ...@@ -19,9 +19,43 @@
#include "tconfig.h" #include "tconfig.h"
#include "dnodeMain.h" #include "dnodeMain.h"
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context);
static tsem_t exitSem; static tsem_t exitSem;
#ifdef WINDOWS
static void signal_handler(int32_t signum) {
dInfo("shut down signal is %d", signum);
#else
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
taosCfgDynamicOptions("debugFlag 143");
return;
}
if (signum == SIGUSR2) {
taosCfgDynamicOptions("resetlog");
return;
}
dInfo("shut down signal is %d, sender PID:%d cmdline:%s", signum, sigInfo->si_pid, taosGetCmdlineByPID(sigInfo->si_pid));
#endif
syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service...");
// protect the application from receive another signal
struct sigaction act = {{0}};
act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
#ifndef WINDOWS
sigaction(SIGHUP, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
#endif
// inform main thread to exit
tsem_post(&exitSem);
}
int32_t main(int32_t argc, char *argv[]) { int32_t main(int32_t argc, char *argv[]) {
int dump_config = 0; int dump_config = 0;
...@@ -113,6 +147,8 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -113,6 +147,8 @@ int32_t main(int32_t argc, char *argv[]) {
/* Set termination handler. */ /* Set termination handler. */
struct sigaction act = {{0}}; struct sigaction act = {{0}};
#ifndef WINDOWS
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
act.sa_sigaction = signal_handler; act.sa_sigaction = signal_handler;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
...@@ -120,6 +156,11 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -120,6 +156,11 @@ int32_t main(int32_t argc, char *argv[]) {
sigaction(SIGINT, &act, NULL); sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL); sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL); sigaction(SIGUSR2, &act, NULL);
#else
act.sa_handler = signal_handler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
#endif
// Open /var/log/syslog file to record information. // Open /var/log/syslog file to record information.
openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1); openlog("TDengine:", LOG_PID | LOG_CONS | LOG_NDELAY, LOG_LOCAL1);
...@@ -146,31 +187,3 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -146,31 +187,3 @@ int32_t main(int32_t argc, char *argv[]) {
closelog(); closelog();
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
if (signum == SIGUSR1) {
taosCfgDynamicOptions("debugFlag 143");
return;
}
if (signum == SIGUSR2) {
taosCfgDynamicOptions("resetlog");
return;
}
syslog(LOG_INFO, "Shut down signal is %d", signum);
syslog(LOG_INFO, "Shutting down TDengine service...");
// clean the system.
dInfo("shut down signal is %d, sender PID:%d cmdline:%s", signum, sigInfo->si_pid, taosGetCmdlineByPID(sigInfo->si_pid));
// protect the application from receive another signal
struct sigaction act = {{0}};
act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigaction(SIGUSR1, &act, NULL);
sigaction(SIGUSR2, &act, NULL);
// inform main thread to exit
tsem_post(&exitSem);
}
...@@ -93,14 +93,14 @@ static void addStringField(SBufferWriter* bw, const char* k, const char* v) { ...@@ -93,14 +93,14 @@ static void addStringField(SBufferWriter* bw, const char* k, const char* v) {
static void addCpuInfo(SBufferWriter* bw) { static void addCpuInfo(SBufferWriter* bw) {
char * line = NULL; char * line = NULL;
size_t size = 0; size_t size = 0;
int done = 0; int32_t done = 0;
FILE* fp = fopen("/proc/cpuinfo", "r"); FILE* fp = fopen("/proc/cpuinfo", "r");
if (fp == NULL) { if (fp == NULL) {
return; return;
} }
while (done != 3 && (size = getline(&line, &size, fp)) != -1) { while (done != 3 && (size = tgetline(&line, &size, fp)) != -1) {
line[size - 1] = '\0'; line[size - 1] = '\0';
if (((done&1) == 0) && strncmp(line, "model name", 10) == 0) { if (((done&1) == 0) && strncmp(line, "model name", 10) == 0) {
const char* v = strchr(line, ':') + 2; const char* v = strchr(line, ':') + 2;
...@@ -129,7 +129,7 @@ static void addOsInfo(SBufferWriter* bw) { ...@@ -129,7 +129,7 @@ static void addOsInfo(SBufferWriter* bw) {
return; return;
} }
while ((size = getline(&line, &size, fp)) != -1) { while ((size = tgetline(&line, &size, fp)) != -1) {
line[size - 1] = '\0'; line[size - 1] = '\0';
if (strncmp(line, "PRETTY_NAME", 11) == 0) { if (strncmp(line, "PRETTY_NAME", 11) == 0) {
const char* p = strchr(line, '=') + 1; const char* p = strchr(line, '=') + 1;
...@@ -155,7 +155,7 @@ static void addMemoryInfo(SBufferWriter* bw) { ...@@ -155,7 +155,7 @@ static void addMemoryInfo(SBufferWriter* bw) {
return; return;
} }
while ((size = getline(&line, &size, fp)) != -1) { while ((size = tgetline(&line, &size, fp)) != -1) {
line[size - 1] = '\0'; line[size - 1] = '\0';
if (strncmp(line, "MemTotal", 8) == 0) { if (strncmp(line, "MemTotal", 8) == 0) {
const char* p = strchr(line, ':') + 1; const char* p = strchr(line, ':') + 1;
...@@ -200,7 +200,7 @@ static void sendTelemetryReport() { ...@@ -200,7 +200,7 @@ static void sendTelemetryReport() {
dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno));
return; return;
} }
int fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
if (fd < 0) { if (fd < 0) {
dTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); dTrace("failed to create socket for telemetry, reason:%s", strerror(errno));
return; return;
...@@ -222,10 +222,10 @@ static void sendTelemetryReport() { ...@@ -222,10 +222,10 @@ static void sendTelemetryReport() {
"Content-Type: application/json\n" "Content-Type: application/json\n"
"Content-Length: "; "Content-Length: ";
taosWriteSocket(fd, header, strlen(header)); taosWriteSocket(fd, header, (int32_t)strlen(header));
int contLen = tbufTell(&bw) - 1; int32_t contLen = (int32_t)(tbufTell(&bw) - 1);
sprintf(buf, "%d\n\n", contLen); sprintf(buf, "%d\n\n", contLen);
taosWriteSocket(fd, buf, strlen(buf)); taosWriteSocket(fd, buf, (int32_t)strlen(buf));
taosWriteSocket(fd, tbufGetData(&bw, false), contLen); taosWriteSocket(fd, tbufGetData(&bw, false), contLen);
tbufCloseWriter(&bw); tbufCloseWriter(&bw);
...@@ -258,7 +258,7 @@ static void* telemetryThread(void* param) { ...@@ -258,7 +258,7 @@ static void* telemetryThread(void* param) {
} }
static void dnodeGetEmail(char* filepath) { static void dnodeGetEmail(char* filepath) {
int fd = open(filepath, O_RDONLY); int32_t fd = open(filepath, O_RDONLY);
if (fd < 0) { if (fd < 0) {
return; return;
} }
...@@ -267,10 +267,9 @@ static void dnodeGetEmail(char* filepath) { ...@@ -267,10 +267,9 @@ static void dnodeGetEmail(char* filepath) {
dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
} }
close(fd); taosClose(fd);
} }
int32_t dnodeInitTelemetry() { int32_t dnodeInitTelemetry() {
if (!tsEnableTelemetryReporting) { if (!tsEnableTelemetryReporting) {
return 0; return 0;
...@@ -303,7 +302,7 @@ void dnodeCleanupTelemetry() { ...@@ -303,7 +302,7 @@ void dnodeCleanupTelemetry() {
return; return;
} }
if (tsTelemetryThread) { if (taosCheckPthreadValid(tsTelemetryThread)) {
tsem_post(&tsExitSem); tsem_post(&tsExitSem);
pthread_join(tsTelemetryThread, NULL); pthread_join(tsTelemetryThread, NULL);
tsem_destroy(&tsExitSem); tsem_destroy(&tsExitSem);
......
...@@ -52,14 +52,14 @@ int32_t dnodeInitVWrite() { ...@@ -52,14 +52,14 @@ int32_t dnodeInitVWrite() {
void dnodeCleanupVWrite() { void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SVWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(pWorker->qset); taosQsetThreadResume(pWorker->qset);
} }
} }
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SVWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
......
...@@ -157,7 +157,7 @@ int32_t dnodeInitVnodes() { ...@@ -157,7 +157,7 @@ int32_t dnodeInitVnodes() {
int32_t failedVnodes = 0; int32_t failedVnodes = 0;
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
SOpenVnodeThread *pThread = &threads[t]; SOpenVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && pThread->thread) { if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
pthread_join(pThread->thread, NULL); pthread_join(pThread->thread, NULL);
} }
openVnodes += pThread->opened; openVnodes += pThread->opened;
...@@ -260,7 +260,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -260,7 +260,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64); tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
pStatus->clusterCfg.checkTime = 0; pStatus->clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
......
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#include "mnodeCluster.h" #include "mnodeCluster.h"
int32_t tsAccessSquence = 0; int32_t tsAccessSquence = 0;
int64_t tsDnodeRid = -1; int64_t tsDnodeRid = -1;
static void * tsDnodeSdb = NULL; static void * tsDnodeSdb = NULL;
static int32_t tsDnodeUpdateSize = 0; static int32_t tsDnodeUpdateSize = 0;
extern void * tsMnodeSdb; extern void * tsMnodeSdb;
......
...@@ -841,6 +841,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) { ...@@ -841,6 +841,7 @@ static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} else { // batch master replay, reprocess the whole batch } else { // batch master replay, reprocess the whole batch
assert(0); assert(0);
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
} }
} }
} }
......
...@@ -208,6 +208,12 @@ typedef struct { ...@@ -208,6 +208,12 @@ typedef struct {
int wordexp(const char *words, wordexp_t *pwordexp, int flags); int wordexp(const char *words, wordexp_t *pwordexp, int flags);
void wordfree(wordexp_t *pwordexp); void wordfree(wordexp_t *pwordexp);
#define openlog(a, b, c)
#define closelog()
#define LOG_ERR 0
#define LOG_INFO 1
void syslog(int unused, const char *format, ...);
#define TAOS_OS_FUNC_ATOMIC #define TAOS_OS_FUNC_ATOMIC
#define atomic_load_8(ptr) (*(char volatile*)(ptr)) #define atomic_load_8(ptr) (*(char volatile*)(ptr))
#define atomic_load_16(ptr) (*(short volatile*)(ptr)) #define atomic_load_16(ptr) (*(short volatile*)(ptr))
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
void syslog(int unused, const char *format, ...) {}
\ No newline at end of file
...@@ -235,4 +235,9 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) { ...@@ -235,4 +235,9 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
return EXCEPTION_CONTINUE_SEARCH; return EXCEPTION_CONTINUE_SEARCH;
} }
void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); } void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); }
\ No newline at end of file
bool taosGetSystemUid(char *uid) {
sprintf(uid, "uid_not_implemented_yet");
return true;
}
\ No newline at end of file
...@@ -3,6 +3,6 @@ PROJECT(TDengine) ...@@ -3,6 +3,6 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(monitor) ADD_SUBDIRECTORY(monitor)
ADD_SUBDIRECTORY(http) ADD_SUBDIRECTORY(http)
IF (TD_MQTT) IF (TD_LINUX AND TD_MQTT)
ADD_SUBDIRECTORY(mqtt) ADD_SUBDIRECTORY(mqtt)
ENDIF () ENDIF ()
\ No newline at end of file
...@@ -27,10 +27,6 @@ ...@@ -27,10 +27,6 @@
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h" #include "syncTcp.h"
#ifndef SIGHUP
#define SIGHUP SIGTERM
#endif
static void arbSignalHandler(int32_t signum); static void arbSignalHandler(int32_t signum);
static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(int64_t rid); static void arbProcessBrokenLink(int64_t rid);
...@@ -78,8 +74,10 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -78,8 +74,10 @@ int32_t main(int32_t argc, char *argv[]) {
act.sa_handler = arbSignalHandler; act.sa_handler = arbSignalHandler;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL); sigaction(SIGINT, &act, NULL);
#ifndef WINDOWS
sigaction(SIGHUP, &act, NULL);
#endif
tsAsyncLog = 0; tsAsyncLog = 0;
strcat(arbLogPath, "/arbitrator.log"); strcat(arbLogPath, "/arbitrator.log");
...@@ -180,8 +178,10 @@ static void arbSignalHandler(int32_t signum) { ...@@ -180,8 +178,10 @@ static void arbSignalHandler(int32_t signum) {
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
sigaction(SIGHUP, &act, NULL);
sigaction(SIGINT, &act, NULL); sigaction(SIGINT, &act, NULL);
#ifndef WINDOWS
sigaction(SIGHUP, &act, NULL);
#endif
sInfo("shut down signal is %d", signum); sInfo("shut down signal is %d", signum);
......
CMAKE_MINIMUM_REQUIRED(VERSION 3.5) CMAKE_MINIMUM_REQUIRED(VERSION 3.5)
PROJECT(TDengine) PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
...@@ -9,7 +10,7 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) ...@@ -9,7 +10,7 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(inc) INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX OR TD_WINDOWS)
ADD_LIBRARY(vnode ${SRC}) ADD_LIBRARY(vnode ${SRC})
TARGET_LINK_LIBRARIES(vnode tsdb tcq) TARGET_LINK_LIBRARIES(vnode tsdb tcq)
ENDIF () ENDIF ()
...@@ -41,7 +41,7 @@ int32_t vnodeReadVersion(SVnodeObj *pVnode) { ...@@ -41,7 +41,7 @@ int32_t vnodeReadVersion(SVnodeObj *pVnode) {
goto PARSE_VER_ERROR; goto PARSE_VER_ERROR;
} }
len = fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file);
goto PARSE_VER_ERROR; goto PARSE_VER_ERROR;
......
...@@ -109,7 +109,7 @@ static void vnodeStopMWorker() { ...@@ -109,7 +109,7 @@ static void vnodeStopMWorker() {
void vnodeCleanupMWorker() { void vnodeCleanupMWorker() {
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) { for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
SVMWorker *pWorker = tsVMWorkerPool.worker + i; SVMWorker *pWorker = tsVMWorkerPool.worker + i;
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
taosQsetThreadResume(tsVMWorkerQset); taosQsetThreadResume(tsVMWorkerQset);
} }
vDebug("vmworker:%d is closed", i); vDebug("vmworker:%d is closed", i);
...@@ -118,7 +118,7 @@ void vnodeCleanupMWorker() { ...@@ -118,7 +118,7 @@ void vnodeCleanupMWorker() {
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) { for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
SVMWorker *pWorker = tsVMWorkerPool.worker + i; SVMWorker *pWorker = tsVMWorkerPool.worker + i;
vDebug("vmworker:%d start to join", i); vDebug("vmworker:%d start to join", i);
if (pWorker->thread) { if (taosCheckPthreadValid(pWorker->thread)) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
vDebug("vmworker:%d join success", i); vDebug("vmworker:%d join success", i);
......
...@@ -333,7 +333,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { ...@@ -333,7 +333,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0; if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
if (tsEnableFlowCtrl == 0) { if (tsEnableFlowCtrl == 0) {
int32_t ms = pow(2, pVnode->flowctrlLevel + 2); int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
if (ms > 100) ms = 100; if (ms > 100) ms = 100;
vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms); vTrace("vgId:%d, msg:%p, app:%p, perform flowctrl for %d ms", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, ms);
taosMsleep(ms); taosMsleep(ms);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册