提交 d132653f 编写于 作者: S Shengliang Guan

TD-10431 telemetry coding

上级 b5e9ba4a
...@@ -31,6 +31,7 @@ typedef struct { ...@@ -31,6 +31,7 @@ typedef struct {
int16_t numOfSupportMnodes; int16_t numOfSupportMnodes;
int16_t numOfSupportVnodes; int16_t numOfSupportVnodes;
int16_t numOfSupportQnodes; int16_t numOfSupportQnodes;
int8_t enableTelem;
int32_t statusInterval; int32_t statusInterval;
int32_t mnodeEqualVnodeNum; int32_t mnodeEqualVnodeNum;
float numOfThreadsPerCore; float numOfThreadsPerCore;
...@@ -45,6 +46,8 @@ typedef struct { ...@@ -45,6 +46,8 @@ typedef struct {
char timezone[TSDB_TIMEZONE_LEN]; char timezone[TSDB_TIMEZONE_LEN];
char locale[TSDB_LOCALE_LEN]; char locale[TSDB_LOCALE_LEN];
char charset[TSDB_LOCALE_LEN]; char charset[TSDB_LOCALE_LEN];
char buildinfo[64];
char gitinfo[48];
} SDnodeOpt; } SDnodeOpt;
/* ------------------------ SDnode ------------------------ */ /* ------------------------ SDnode ------------------------ */
......
...@@ -43,24 +43,31 @@ typedef struct SMnodeLoad { ...@@ -43,24 +43,31 @@ typedef struct SMnodeLoad {
int64_t compStorage; int64_t compStorage;
} SMnodeLoad; } SMnodeLoad;
typedef struct SMnodeCfg {
int32_t sver;
int8_t enableTelem;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
char *buildinfo;
char *gitinfo;
} SMnodeCfg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SMnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
int32_t sver;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
} SMnodeOpt; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */
......
...@@ -76,6 +76,10 @@ int32_t* taosGetErrno(); ...@@ -76,6 +76,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0114) #define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0115) #define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_INVALID_VERSION_NUMBER TAOS_DEF_ERROR_CODE(0, 0x0120)
#define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121)
#define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation") #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle") #define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle")
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_VERSION_H
#define _TD_UTIL_VERSION_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t taosVersionStrToInt(const char *vstr, int32_t *vint);
int32_t taosVersionIntToStr(int32_t vint, char *vstr, int32_t len);
int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t comparedSegments);
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_VERSION_H*/
#include "os.h"
#include "tdef.h"
#include "ulog.h"
#include "taoserror.h"
bool taosGetVersionNumber(char *versionStr, int *versionNubmer) {
if (versionStr == NULL || versionNubmer == NULL) {
return false;
}
int versionNumberPos[5] = {0};
int len = (int)strlen(versionStr);
int dot = 0;
for (int pos = 0; pos < len && dot < 4; ++pos) {
if (versionStr[pos] == '.') {
versionStr[pos] = 0;
versionNumberPos[++dot] = pos + 1;
}
}
if (dot != 3) {
return false;
}
for (int pos = 0; pos < 4; ++pos) {
versionNubmer[pos] = atoi(versionStr + versionNumberPos[pos]);
}
versionStr[versionNumberPos[1] - 1] = '.';
versionStr[versionNumberPos[2] - 1] = '.';
versionStr[versionNumberPos[3] - 1] = '.';
return true;
}
int taosCheckVersion(char *input_client_version, char *input_server_version, int comparedSegments) {
char client_version[TSDB_VERSION_LEN] = {0};
char server_version[TSDB_VERSION_LEN] = {0};
int clientVersionNumber[4] = {0};
int serverVersionNumber[4] = {0};
tstrncpy(client_version, input_client_version, sizeof(client_version));
tstrncpy(server_version, input_server_version, sizeof(server_version));
if (!taosGetVersionNumber(client_version, clientVersionNumber)) {
uError("invalid client version:%s", client_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
if (!taosGetVersionNumber(server_version, serverVersionNumber)) {
uError("invalid server version:%s", server_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
for(int32_t i = 0; i < comparedSegments; ++i) {
if (clientVersionNumber[i] != serverVersionNumber[i]) {
uError("the %d-th number of server version:%s not matched with client version:%s", i, server_version,
client_version);
return TSDB_CODE_TSC_INVALID_VERSION;
}
}
return 0;
}
...@@ -136,7 +136,7 @@ void dmnWaitSignal() { ...@@ -136,7 +136,7 @@ void dmnWaitSignal() {
} }
void dmnInitOption(SDnodeOpt *pOption) { void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = tsVersion; pOption->sver = 30000000; //3.0.0.0
pOption->numOfCores = tsNumOfCores; pOption->numOfCores = tsNumOfCores;
pOption->numOfSupportMnodes = 1; pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1; pOption->numOfSupportVnodes = 1;
...@@ -155,6 +155,8 @@ void dmnInitOption(SDnodeOpt *pOption) { ...@@ -155,6 +155,8 @@ void dmnInitOption(SDnodeOpt *pOption) {
tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN); tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN); tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN); tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN);
tstrncpy(pOption->buildinfo, buildinfo, 64);
tstrncpy(pOption->gitinfo, gitinfo, 48);
} }
int dmnRunDnode() { int dmnRunDnode() {
......
...@@ -331,13 +331,16 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -331,13 +331,16 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
pOption->statusInterval = pDnode->opt.statusInterval; pOption->cfg.enableTelem = pDnode->opt.enableTelem;
pOption->mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum; pOption->cfg.statusInterval = pDnode->opt.statusInterval;
pOption->shellActivityTimer = pDnode->opt.shellActivityTimer; pOption->cfg.mnodeEqualVnodeNum = pDnode->opt.mnodeEqualVnodeNum;
pOption->timezone = pDnode->opt.timezone; pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
pOption->charset = pDnode->opt.charset; pOption->cfg.timezone = pDnode->opt.timezone;
pOption->locale = pDnode->opt.locale; pOption->cfg.charset = pDnode->opt.charset;
pOption->cfg.locale = pDnode->opt.locale;
pOption->cfg.gitinfo = pDnode->opt.gitinfo;
pOption->cfg.buildinfo = pDnode->opt.buildinfo;
} }
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
int32_t mndInitCluster(SMnode *pMnode); int32_t mndInitCluster(SMnode *pMnode);
void mndCleanupCluster(SMnode *pMnode); void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -51,6 +51,15 @@ typedef struct { ...@@ -51,6 +51,15 @@ typedef struct {
SCacheObj *cache; SCacheObj *cache;
} SProfileMgmt; } SProfileMgmt;
typedef struct {
int8_t enable;
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int32_t exit;
pthread_t thread;
char email[TSDB_FQDN_LEN];
} STelemMgmt;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
...@@ -59,23 +68,18 @@ typedef struct SMnode { ...@@ -59,23 +68,18 @@ typedef struct SMnode {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer; tmr_h timer;
char *path; char *path;
SMnodeCfg cfg;
SSdb *pSdb; SSdb *pSdb;
SDnode *pDnode; SDnode *pDnode;
SArray *pSteps; SArray *pSteps;
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
int32_t sver;
int32_t statusInterval;
int32_t mnodeEqualVnodeNum;
int32_t shellActivityTimer;
char *timezone;
char *locale;
char *charset;
} SMnode; } SMnode;
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
......
...@@ -48,6 +48,19 @@ int32_t mndInitCluster(SMnode *pMnode) { ...@@ -48,6 +48,19 @@ int32_t mndInitCluster(SMnode *pMnode) {
void mndCleanupCluster(SMnode *pMnode) {} void mndCleanupCluster(SMnode *pMnode) {}
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
SSdb *pSdb = pMnode->pSdb;
SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
if (pCluster = NULL) {
return -1;
}
tstrncpy(clusterName, pCluster->name, len);
sdbRelease(pSdb, pCluster);
return 0;
}
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj)); SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj));
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
...@@ -198,4 +211,4 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, ...@@ -198,4 +211,4 @@ static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) { static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
...@@ -179,32 +179,33 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { ...@@ -179,32 +179,33 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
} }
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
if (pCfg->mnodeEqualVnodeNum != pMnode->mnodeEqualVnodeNum) { if (pCfg->mnodeEqualVnodeNum != pMnode->cfg.mnodeEqualVnodeNum) {
mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum, pMnode->mnodeEqualVnodeNum); mError("\"mnodeEqualVnodeNum\"[%d - %d] cfg inconsistent", pCfg->mnodeEqualVnodeNum,
pMnode->cfg.mnodeEqualVnodeNum);
return DND_REASON_MN_EQUAL_VN_NOT_MATCH; return DND_REASON_MN_EQUAL_VN_NOT_MATCH;
} }
if (pCfg->statusInterval != pMnode->statusInterval) { if (pCfg->statusInterval != pMnode->cfg.statusInterval) {
mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->statusInterval); mError("\"statusInterval\"[%d - %d] cfg inconsistent", pCfg->statusInterval, pMnode->cfg.statusInterval);
return DND_REASON_STATUS_INTERVAL_NOT_MATCH; return DND_REASON_STATUS_INTERVAL_NOT_MATCH;
} }
int64_t checkTime = 0; int64_t checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
if ((0 != strcasecmp(pCfg->timezone, pMnode->timezone)) && (checkTime != pCfg->checkTime)) { if ((0 != strcasecmp(pCfg->timezone, pMnode->cfg.timezone)) && (checkTime != pCfg->checkTime)) {
mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, tsTimezone, mError("\"timezone\"[%s - %s] [%" PRId64 " - %" PRId64 "] cfg inconsistent", pCfg->timezone, pMnode->cfg.timezone,
pCfg->checkTime, checkTime); pCfg->checkTime, checkTime);
return DND_REASON_TIME_ZONE_NOT_MATCH; return DND_REASON_TIME_ZONE_NOT_MATCH;
} }
if (0 != strcasecmp(pCfg->locale, pMnode->locale)) { if (0 != strcasecmp(pCfg->locale, pMnode->cfg.locale)) {
mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->locale); mError("\"locale\"[%s - %s] cfg parameters inconsistent", pCfg->locale, pMnode->cfg.locale);
return DND_REASON_LOCALE_NOT_MATCH; return DND_REASON_LOCALE_NOT_MATCH;
} }
if (0 != strcasecmp(pCfg->charset, pMnode->charset)) { if (0 != strcasecmp(pCfg->charset, pMnode->cfg.charset)) {
mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->charset); mError("\"charset\"[%s - %s] cfg parameters inconsistent.", pCfg->charset, pMnode->cfg.charset);
return DND_REASON_CHARSET_NOT_MATCH; return DND_REASON_CHARSET_NOT_MATCH;
} }
...@@ -251,12 +252,12 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { ...@@ -251,12 +252,12 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
} }
} }
if (pStatus->sver != pMnode->sver) { if (pStatus->sver != pMnode->cfg.sver) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
} }
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->sver); mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
return TSDB_CODE_MND_INVALID_MSG_VERSION; return TSDB_CODE_MND_INVALID_MSG_VERSION;
} }
......
...@@ -67,7 +67,7 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); ...@@ -67,7 +67,7 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) { int32_t mndInitProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connCheckTime = pMnode->shellActivityTimer * 2; int32_t connCheckTime = pMnode->cfg.shellActivityTimer * 2;
pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn"); pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
if (pMgmt->cache == NULL) { if (pMgmt->cache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -126,7 +126,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t ...@@ -126,7 +126,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t
tstrncpy(connObj.user, user, TSDB_USER_LEN); tstrncpy(connObj.user, user, TSDB_USER_LEN);
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
int32_t keepTime = pMnode->shellActivityTimer * 3; int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -153,7 +153,7 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { ...@@ -153,7 +153,7 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
return NULL; return NULL;
} }
int32_t keepTime = pMnode->shellActivityTimer * 3; int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
pConn->lastAccess = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); pConn->lastAccess = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
mTrace("conn:%d, data:%p acquired from cache", pConn->id, pConn); mTrace("conn:%d, data:%p acquired from cache", pConn->id, pConn);
......
...@@ -69,7 +69,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) { ...@@ -69,7 +69,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) {
return NULL; return NULL;
} }
int32_t keepTime = pMnode->shellActivityTimer * 6 * 1000; int32_t keepTime = pMnode->cfg.shellActivityTimer * 6 * 1000;
SShowObj *pShowRet = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), pShow, size, keepTime); SShowObj *pShowRet = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), pShow, size, keepTime);
free(pShow); free(pShow);
if (pShowRet == NULL) { if (pShowRet == NULL) {
......
...@@ -15,27 +15,15 @@ ...@@ -15,27 +15,15 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndTelem.h" #include "mndTelem.h"
#include "tbuffer.h" #include "mndCluster.h"
#include "tglobal.h"
#include "mndSync.h" #include "mndSync.h"
#include "tbuffer.h"
#include "tversion.h"
#define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80 #define TELEMETRY_PORT 80
#define REPORT_INTERVAL 86400 #define REPORT_INTERVAL 86400
/*
* sem_timedwait is NOT implemented on MacOSX
* thus we use pthread_mutex_t/pthread_cond_t to simulate
*/
static struct {
bool enable;
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int32_t exit;
pthread_t thread;
char email[TSDB_FQDN_LEN];
} tsTelem;
static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } static void mndBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
static void mndCloseObject(SBufferWriter* bw) { static void mndCloseObject(SBufferWriter* bw) {
...@@ -86,7 +74,7 @@ static void mndAddStringField(SBufferWriter* bw, const char* k, const char* v) { ...@@ -86,7 +74,7 @@ static void mndAddStringField(SBufferWriter* bw, const char* k, const char* v) {
tbufWriteChar(bw, ','); tbufWriteChar(bw, ',');
} }
static void mndAddCpuInfo(SBufferWriter* bw) { static void mndAddCpuInfo(SMnode* pMnode, SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
int32_t done = 0; int32_t done = 0;
...@@ -116,7 +104,7 @@ static void mndAddCpuInfo(SBufferWriter* bw) { ...@@ -116,7 +104,7 @@ static void mndAddCpuInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void mndAddOsInfo(SBufferWriter* bw) { static void mndAddOsInfo(SMnode* pMnode, SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
...@@ -142,7 +130,7 @@ static void mndAddOsInfo(SBufferWriter* bw) { ...@@ -142,7 +130,7 @@ static void mndAddOsInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void mndAddMemoryInfo(SBufferWriter* bw) { static void mndAddMemoryInfo(SMnode* pMnode, SBufferWriter* bw) {
char* line = NULL; char* line = NULL;
size_t size = 0; size_t size = 0;
...@@ -165,16 +153,21 @@ static void mndAddMemoryInfo(SBufferWriter* bw) { ...@@ -165,16 +153,21 @@ static void mndAddMemoryInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void mndAddVersionInfo(SBufferWriter* bw) { static void mndAddVersionInfo(SMnode* pMnode, SBufferWriter* bw) {
mndAddStringField(bw, "version", version); STelemMgmt* pMgmt = &pMnode->telemMgmt;
mndAddStringField(bw, "buildInfo", buildinfo);
mndAddStringField(bw, "gitInfo", gitinfo); char vstr[32] = {0};
mndAddStringField(bw, "email", tsTelem.email); taosVersionIntToStr(pMnode->cfg.sver, vstr, 32);
mndAddStringField(bw, "version", vstr);
mndAddStringField(bw, "buildInfo", pMnode->cfg.buildinfo);
mndAddStringField(bw, "gitInfo", pMnode->cfg.gitinfo);
mndAddStringField(bw, "email", pMgmt->email);
} }
static void mndAddRuntimeInfo(SBufferWriter* bw) { static void mndAddRuntimeInfo(SMnode* pMnode, SBufferWriter* bw) {
SMnodeLoad load = {0}; SMnodeLoad load = {0};
if (mndGetLoad(NULL, &load) != 0) { if (mndGetLoad(pMnode, &load) != 0) {
return; return;
} }
...@@ -190,11 +183,13 @@ static void mndAddRuntimeInfo(SBufferWriter* bw) { ...@@ -190,11 +183,13 @@ static void mndAddRuntimeInfo(SBufferWriter* bw) {
mndAddIntField(bw, "compStorage", load.compStorage); mndAddIntField(bw, "compStorage", load.compStorage);
} }
static void mndSendTelemetryReport() { static void mndSendTelemetryReport(SMnode* pMnode) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
char buf[128] = {0}; char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
mTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); mTrace("failed to get IP address of " TELEMETRY_SERVER " since :%s", strerror(errno));
return; return;
} }
SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
...@@ -203,19 +198,18 @@ static void mndSendTelemetryReport() { ...@@ -203,19 +198,18 @@ static void mndSendTelemetryReport() {
return; return;
} }
int32_t clusterId = 0; char clusterName[64] = {0};
char clusterIdStr[20] = {0}; mndGetClusterName(pMnode, clusterName, sizeof(clusterName));
snprintf(clusterIdStr, sizeof(clusterIdStr), "%d", clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
mndBeginObject(&bw); mndBeginObject(&bw);
mndAddStringField(&bw, "instanceId", clusterIdStr); mndAddStringField(&bw, "instanceId", clusterName);
mndAddIntField(&bw, "reportVersion", 1); mndAddIntField(&bw, "reportVersion", 1);
mndAddOsInfo(&bw); mndAddOsInfo(pMnode, &bw);
mndAddCpuInfo(&bw); mndAddCpuInfo(pMnode, &bw);
mndAddMemoryInfo(&bw); mndAddMemoryInfo(pMnode, &bw);
mndAddVersionInfo(&bw); mndAddVersionInfo(pMnode, &bw);
mndAddRuntimeInfo(&bw); mndAddRuntimeInfo(pMnode, &bw);
mndCloseObject(&bw); mndCloseObject(&bw);
const char* header = const char* header =
...@@ -241,23 +235,26 @@ static void mndSendTelemetryReport() { ...@@ -241,23 +235,26 @@ static void mndSendTelemetryReport() {
} }
static void* mndTelemThreadFp(void* param) { static void* mndTelemThreadFp(void* param) {
SMnode* pMnode = param;
STelemMgmt* pMgmt = &pMnode->telemMgmt;
struct timespec end = {0}; struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
end.tv_sec += 300; // wait 5 minutes before send first report end.tv_sec += 300; // wait 5 minutes before send first report
setThreadName("mnd-telem"); setThreadName("mnd-telem");
while (!tsTelem.exit) { while (!pMgmt->exit) {
int32_t r = 0; int32_t r = 0;
struct timespec ts = end; struct timespec ts = end;
pthread_mutex_lock(&tsTelem.lock); pthread_mutex_lock(&pMgmt->lock);
r = pthread_cond_timedwait(&tsTelem.cond, &tsTelem.lock, &ts); r = pthread_cond_timedwait(&pMgmt->cond, &pMgmt->lock, &ts);
pthread_mutex_unlock(&tsTelem.lock); pthread_mutex_unlock(&pMgmt->lock);
if (r == 0) break; if (r == 0) break;
if (r != ETIMEDOUT) continue; if (r != ETIMEDOUT) continue;
if (mndIsMaster(NULL)) { if (mndIsMaster(pMnode)) {
mndSendTelemetryReport(); mndSendTelemetryReport(pMnode);
} }
end.tv_sec += REPORT_INTERVAL; end.tv_sec += REPORT_INTERVAL;
} }
...@@ -265,35 +262,39 @@ static void* mndTelemThreadFp(void* param) { ...@@ -265,35 +262,39 @@ static void* mndTelemThreadFp(void* param) {
return NULL; return NULL;
} }
static void mndGetEmail(char* filepath) { static void mndGetEmail(SMnode* pMnode, char* filepath) {
STelemMgmt* pMgmt = &pMnode->telemMgmt;
int32_t fd = taosOpenFileRead(filepath); int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) { if (fd < 0) {
return; return;
} }
if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) { if (taosReadFile(fd, (void*)pMgmt->email, TSDB_FQDN_LEN) < 0) {
mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
} }
taosCloseFile(fd); taosCloseFile(fd);
} }
int32_t mndInitTelem(SMnode *pMnode) { int32_t mndInitTelem(SMnode* pMnode) {
tsTelem.enable = tsEnableTelemetryReporting; STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!tsTelem.enable) return 0; pMgmt->enable = pMnode->cfg.enableTelem;
if (!pMgmt->enable) return 0;
tsTelem.exit = 0; pMgmt->exit = 0;
pthread_mutex_init(&tsTelem.lock, NULL); pthread_mutex_init(&pMgmt->lock, NULL);
pthread_cond_init(&tsTelem.cond, NULL); pthread_cond_init(&pMgmt->cond, NULL);
tsTelem.email[0] = 0; pMgmt->email[0] = 0;
mndGetEmail("/usr/local/taos/email"); mndGetEmail(pMnode, "/usr/local/taos/email");
pthread_attr_t attr; pthread_attr_t attr;
pthread_attr_init(&attr); pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int32_t code = pthread_create(&tsTelem.thread, &attr, mndTelemThreadFp, NULL); int32_t code = pthread_create(&pMgmt->thread, &attr, mndTelemThreadFp, pMnode);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
if (code != 0) { if (code != 0) {
mTrace("failed to create telemetry thread since :%s", strerror(code)); mTrace("failed to create telemetry thread since :%s", strerror(code));
...@@ -303,18 +304,19 @@ int32_t mndInitTelem(SMnode *pMnode) { ...@@ -303,18 +304,19 @@ int32_t mndInitTelem(SMnode *pMnode) {
return 0; return 0;
} }
void mndCleanupTelem(SMnode *pMnode) { void mndCleanupTelem(SMnode* pMnode) {
if (!tsTelem.enable) return; STelemMgmt* pMgmt = &pMnode->telemMgmt;
if (!pMgmt->enable) return;
if (taosCheckPthreadValid(tsTelem.thread)) { if (taosCheckPthreadValid(pMgmt->thread)) {
pthread_mutex_lock(&tsTelem.lock); pthread_mutex_lock(&pMgmt->lock);
tsTelem.exit = 1; pMgmt->exit = 1;
pthread_cond_signal(&tsTelem.cond); pthread_cond_signal(&pMgmt->cond);
pthread_mutex_unlock(&tsTelem.lock); pthread_mutex_unlock(&pMgmt->lock);
pthread_join(tsTelem.thread, NULL); pthread_join(pMgmt->thread, NULL);
} }
pthread_mutex_destroy(&tsTelem.lock); pthread_mutex_destroy(&pMgmt->lock);
pthread_cond_destroy(&tsTelem.cond); pthread_cond_destroy(&pMgmt->cond);
} }
...@@ -203,22 +203,25 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -203,22 +203,25 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
pMnode->sver = pOption->sver; pMnode->cfg.sver = pOption->cfg.sver;
pMnode->statusInterval = pOption->statusInterval; pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
pMnode->mnodeEqualVnodeNum = pOption->mnodeEqualVnodeNum; pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
pMnode->shellActivityTimer = pOption->shellActivityTimer; pMnode->cfg.mnodeEqualVnodeNum = pOption->cfg.mnodeEqualVnodeNum;
pMnode->timezone = strdup(pOption->timezone); pMnode->cfg.shellActivityTimer = pOption->cfg.shellActivityTimer;
pMnode->locale = strdup(pOption->locale); pMnode->cfg.timezone = strdup(pOption->cfg.timezone);
pMnode->charset = strdup(pOption->charset); pMnode->cfg.locale = strdup(pOption->cfg.locale);
pMnode->cfg.charset = strdup(pOption->cfg.charset);
pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo);
pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) { pMnode->cfg.statusInterval < 1 || pOption->cfg.mnodeEqualVnodeNum < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS; terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1; return -1;
} }
if (pMnode->timezone == NULL || pMnode->locale == NULL || pMnode->charset == NULL) { if (pMnode->cfg.timezone == NULL || pMnode->cfg.locale == NULL || pMnode->cfg.charset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -289,9 +292,11 @@ void mndClose(SMnode *pMnode) { ...@@ -289,9 +292,11 @@ void mndClose(SMnode *pMnode) {
mDebug("start to close mnode"); mDebug("start to close mnode");
mndCleanupSteps(pMnode, -1); mndCleanupSteps(pMnode, -1);
tfree(pMnode->path); tfree(pMnode->path);
tfree(pMnode->charset); tfree(pMnode->cfg.charset);
tfree(pMnode->locale); tfree(pMnode->cfg.locale);
tfree(pMnode->timezone); tfree(pMnode->cfg.timezone);
tfree(pMnode->cfg.gitinfo);
tfree(pMnode->cfg.buildinfo);
tfree(pMnode); tfree(pMnode);
mDebug("mnode is closed"); mDebug("mnode is closed");
} }
......
...@@ -86,6 +86,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID") ...@@ -86,6 +86,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_INVALID_ID, "Invalid Ref ID")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ALREADY_EXIST, "Ref is already there")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
//client //client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tdef.h"
#include "ulog.h"
int32_t taosVersionStrToInt(const char *vstr, int32_t *vint) {
if (vstr == NULL) {
terrno = TSDB_CODE_INVALID_VERSION_STRING;
return -1;
}
int32_t vnum[4] = {0};
int32_t len = strlen(vstr);
char tmp[16] = {0};
for (int32_t spos = 0, tpos = 0, vpos = 0; spos < len && vpos < 4; ++spos) {
if (vstr[spos] != '.') {
tmp[spos - tpos] = vstr[spos];
} else {
vnum[vpos] = atoi(tmp);
memset(tmp, 0, sizeof(tmp));
vpos++;
tpos = spos + 1;
}
}
if (vnum[0] <= 0) {
terrno = TSDB_CODE_INVALID_VERSION_STRING;
return -1;
}
*vint = vnum[0] * 1000000 + vnum[1] * 10000 + vnum[2] * 100 + vnum[3];
return 0;
}
int32_t taosVersionIntToStr(int32_t vint, char *vstr, int32_t len) {
int32_t s1 = (vint % 100000000) / 1000000;
int32_t s2 = (vint % 1000000) / 10000;
int32_t s3 = (vint % 10000) / 100;
int32_t s4 = vint % 100;
if (s1 <= 0) {
terrno = TSDB_CODE_INVALID_VERSION_NUMBER;
return -1;
}
snprintf(vstr, len, "%02d.%02d.%02d.%02d", s1, s2, s3, s4);
return 0;
}
int32_t taosCheckVersionCompatible(int32_t clientVer, int32_t serverVer, int32_t comparedSegments) {
switch (comparedSegments) {
case 4:
break;
case 3:
clientVer %= 100;
serverVer %= 100;
break;
case 2:
clientVer %= 10000;
serverVer %= 10000;
break;
case 1:
clientVer %= 1000000;
serverVer %= 1000000;
break;
default:
terrno = TSDB_CODE_INVALID_VERSION_NUMBER;
return -1;
}
if (clientVer == serverVer) {
return 0;
} else {
terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE;
return -1;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册