diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index b8d7297ee0caf3157890061fed757fbe7431231a..a04b5f32b94b61a49ab3934be2b6e425c00df66a 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -32,6 +32,7 @@ extern uint16_t tsSyncPort; extern int32_t tsStatusInterval; extern int32_t tsNumOfMnodes; extern int32_t tsEnableVnodeBak; +extern int32_t tsEnableTelemetryReporting; // common extern int tsRpcTimer; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 0d071454a7e163996c077fc27e1ddcd85682e7d6..c3dc078428cfa66298444d29ff669fc05e7f65fc 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -40,6 +40,7 @@ uint16_t tsSyncPort = 6040; int32_t tsStatusInterval = 1; // second int32_t tsNumOfMnodes = 3; int32_t tsEnableVnodeBak = 1; +int32_t tsEnableTelemetryReporting = 1; // common int32_t tsRpcTimer = 1000; @@ -430,6 +431,16 @@ static void doInitGlobalConfig() { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "telemetryReporting"; + cfg.ptr = &tsEnableTelemetryReporting; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 1; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "balance"; cfg.ptr = &tsEnableBalance; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/dnode/inc/dnodeTelemetry.h b/src/dnode/inc/dnodeTelemetry.h new file mode 100644 index 0000000000000000000000000000000000000000..6fb62556ae50ec3759c7dc2260d16f36d1daf025 --- /dev/null +++ b/src/dnode/inc/dnodeTelemetry.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_TELEMETRY_H +#define TDENGINE_DNODE_TELEMETRY_H + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t dnodeInitTelemetry(); +void dnodeCleanupTelemetry(); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 6476bb78314f76ae02cbaae0dd04547bc7565313..f521fbe02bf611295fae22212080d97840c085a4 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -30,6 +30,7 @@ #include "dnodeMWrite.h" #include "dnodeMPeer.h" #include "dnodeShell.h" +#include "dnodeTelemetry.h" static int32_t dnodeInitStorage(); static void dnodeCleanupStorage(); @@ -47,18 +48,19 @@ typedef struct { } SDnodeComponent; static const SDnodeComponent tsDnodeComponents[] = { - {"storage", dnodeInitStorage, dnodeCleanupStorage}, - {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, - {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, - {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, - {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, - {"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer}, - {"client", dnodeInitClient, dnodeCleanupClient}, - {"server", dnodeInitServer, dnodeCleanupServer}, - {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, - {"modules", dnodeInitModules, dnodeCleanupModules}, - {"mgmt-tmr",dnodeInitMgmtTimer, dnodeCleanupMgmtTimer}, - {"shell", dnodeInitShell, dnodeCleanupShell} + {"storage", dnodeInitStorage, dnodeCleanupStorage}, + {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, + {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, + {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, + {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, + {"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer}, + {"client", dnodeInitClient, dnodeCleanupClient}, + {"server", dnodeInitServer, dnodeCleanupServer}, + {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, + {"modules", dnodeInitModules, dnodeCleanupModules}, + {"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer}, + {"shell", dnodeInitShell, dnodeCleanupShell}, + {"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, }; static int dnodeCreateDir(const char *dir) { diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c new file mode 100644 index 0000000000000000000000000000000000000000..6963285da1ff09d5cd5c35ed9c451b07c92fbce3 --- /dev/null +++ b/src/dnode/src/dnodeTelemetry.c @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tglobal.h" +#include "tutil.h" +#include "ttime.h" +#include "tsocket.h" +#include "tbuffer.h" +#include "mnode.h" +#include "mnodeCluster.h" +#include "mnodeSdb.h" +#include "dnode.h" +#include "dnodeInt.h" +#include "dnodeTelemetry.h" + +static sem_t tsExitSem; +static pthread_t tsTelemetryThread; + +#if 1 + #define TELEMETRY_SERVER "telemetry.taosdata.com" + #define TELEMETRY_PORT 80 + #define REPORT_INTERVAL 86400 +#else + #define TELEMETRY_SERVER "localhost" + #define TELEMETRY_PORT 80 + #define REPORT_INTERVAL 5 +#endif + + +static void beginObject(SBufferWriter* bw) { + tbufWriteChar(bw, '{'); +} + +static void closeObject(SBufferWriter* bw) { + size_t len = tbufTell(bw); + if (tbufGetData(bw, false)[len - 1] == ',') { + tbufWriteCharAt(bw, len - 1, '}'); + } else { + tbufWriteChar(bw, '}'); + } + tbufWriteChar(bw, ','); +} + +#if 0 +static void beginArray(SBufferWriter* bw) { + tbufWriteChar(bw, '['); +} + +static void closeArray(SBufferWriter* bw) { + size_t len = tbufTell(bw); + if (tbufGetData(bw, false)[len - 1] == ',') { + tbufWriteCharAt(bw, len - 1, ']'); + } else { + tbufWriteChar(bw, ']'); + } + tbufWriteChar(bw, ','); +} +#endif + +static void writeString(SBufferWriter* bw, const char* str) { + tbufWriteChar(bw, '"'); + tbufWrite(bw, str, strlen(str)); + tbufWriteChar(bw, '"'); +} + +static void addIntField(SBufferWriter* bw, const char* k, int64_t v) { + writeString(bw, k); + tbufWriteChar(bw, ':'); + char buf[32]; + sprintf(buf, "%" PRId64, v); + tbufWrite(bw, buf, strlen(buf)); + tbufWriteChar(bw, ','); +} + +static void addStringField(SBufferWriter* bw, const char* k, const char* v) { + writeString(bw, k); + tbufWriteChar(bw, ':'); + writeString(bw, v); + tbufWriteChar(bw, ','); +} + +static void addCpuInfo(SBufferWriter* bw) { + char * line = NULL; + size_t size = 0; + int done = 0; + + FILE* fp = fopen("/proc/cpuinfo", "r"); + if (fp == NULL) { + return; + } + + while (done != 3 && (size = getline(&line, &size, fp)) != -1) { + line[size - 1] = '\0'; + if (((done&1) == 0) && strncmp(line, "model name", 10) == 0) { + const char* v = strchr(line, ':') + 2; + addStringField(bw, "cpuModel", v); + done |= 1; + } else if (((done&2)==0) && strncmp(line, "cpu cores", 9) == 0) { + const char* v = strchr(line, ':') + 2; + writeString(bw, "numOfCpu"); + tbufWriteChar(bw, ':'); + tbufWrite(bw, v, strlen(v)); + tbufWriteChar(bw, ','); + done |= 2; + } + } + + free(line); + fclose(fp); +} + +static void addOsInfo(SBufferWriter* bw) { + char * line = NULL; + size_t size = 0; + + FILE* fp = fopen("/etc/os-release", "r"); + if (fp == NULL) { + return; + } + + while ((size = getline(&line, &size, fp)) != -1) { + line[size - 1] = '\0'; + if (strncmp(line, "PRETTY_NAME", 11) == 0) { + const char* p = strchr(line, '=') + 1; + if (*p == '"') { + p++; + line[size - 2] = 0; + } + addStringField(bw, "os", p); + break; + } + } + + free(line); + fclose(fp); +} + +static void addMemoryInfo(SBufferWriter* bw) { + char * line = NULL; + size_t size = 0; + + FILE* fp = fopen("/proc/meminfo", "r"); + if (fp == NULL) { + return; + } + + while ((size = getline(&line, &size, fp)) != -1) { + line[size - 1] = '\0'; + if (strncmp(line, "MemTotal", 8) == 0) { + const char* p = strchr(line, ':') + 1; + while (*p == ' ') p++; + addStringField(bw, "memory", p); + break; + } + } + + free(line); + fclose(fp); +} + +static void addVersionInfo(SBufferWriter* bw) { + addStringField(bw, "version", version); + addStringField(bw, "buildInfo", buildinfo); + addStringField(bw, "gitInfo", gitinfo); + //addStringField(&bw, "installAt", "2020-08-01T00:00:00Z"); +} + +static void addRuntimeInfo(SBufferWriter* bw) { + addIntField(bw, "clusterId", mnodeGetClusterId()); + // addIntField(&bw, "numOfDnode", 1); + // addIntField(&bw, "numOfVnode", 1); + // addIntField(&bw, "numOfStable", 1); + // addIntField(&bw, "numOfTable", 1); + // addIntField(&bw, "numOfRows", 1); + // addStringField(&bw, "startAt", "2020-08-01T00:00:00Z"); + // addStringField(&bw, "memoryUsage", "10240 kB"); + // addStringField(&bw, "diskUsage", "10240 MB"); +} + +static void sendTelemetryReport() { + char buf[128]; + uint32_t ip = taosGetIpFromFqdn(TELEMETRY_SERVER); + if (ip == 0xffffffff) { + dError("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); + return; + } + int fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); + if (fd < 0) { + dError("failed to create socket for telemetry, reason:%s", strerror(errno)); + return; + } + + SBufferWriter bw = tbufInitWriter(NULL, false); + beginObject(&bw); + addIntField(&bw, "reportVersion", 1); + addOsInfo(&bw); + addCpuInfo(&bw); + addMemoryInfo(&bw); + addVersionInfo(&bw); + addRuntimeInfo(&bw); + closeObject(&bw); + + const char* header = "POST /report HTTP/1.1\n" + "Host: " TELEMETRY_SERVER "\n" + "Content-Type: application/json\n" + "Content-Length: "; + + taosWriteSocket(fd, header, strlen(header)); + int contLen = tbufTell(&bw) - 1; + sprintf(buf, "%d\n\n", contLen); + taosWriteSocket(fd, buf, strlen(buf)); + taosWriteSocket(fd, tbufGetData(&bw, false), contLen); + tbufCloseWriter(&bw); + + taosReadSocket(fd, buf, 10); // read something to avoid nginx error 499 + taosCloseSocket(fd); +} + +static void* telemetryThread(void* param) { + int timeToWait = 0; + while (1) { + if (timeToWait <= 0) { + if (sdbIsMaster()) { + sendTelemetryReport(); + } + timeToWait = REPORT_INTERVAL; + } + + int startAt = taosGetTimestampSec(); + struct timespec timeout = {.tv_sec = timeToWait, .tv_nsec = 0}; + if (sem_timedwait(&tsExitSem, &timeout) == 0) { + break; + } + timeToWait -= (taosGetTimestampSec() - startAt); + } + + return NULL; +} + +int32_t dnodeInitTelemetry() { + if (!tsEnableTelemetryReporting) { + return 0; + } + + if (sem_init(&tsExitSem, 0, 0) == -1) { + // just log the error, it is ok for telemetry to fail + dError("failed to create semaphore for telemetry, reason:%s", strerror(errno)); + return 0; + } + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + int32_t code = pthread_create(&tsTelemetryThread, &attr, telemetryThread, NULL); + pthread_attr_destroy(&attr); + if (code != 0) { + dError("failed to create telemetry thread, reason:%s", strerror(errno)); + } + + return 0; +} + +void dnodeCleanupTelemetry() { + if (!tsEnableTelemetryReporting) { + return; + } + + if (tsTelemetryThread) { + sem_post(&tsExitSem); + pthread_join(tsTelemetryThread, NULL); + sem_destroy(&tsExitSem); + } +} \ No newline at end of file