diff --git a/source/server/dnode/inc/dnodeTelemetry.h b/source/server/dnode/inc/dnodeTelemetry.h new file mode 100644 index 0000000000000000000000000000000000000000..e3e097d2f237240fa77f3ef85b5be322c7112c53 --- /dev/null +++ b/source/server/dnode/inc/dnodeTelemetry.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_TELEMETRY_H_ +#define _TD_DNODE_TELEMETRY_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dnodeInt.h" + +/* + * sem_timedwait is NOT implemented on MacOSX + * thus we use pthread_mutex_t/pthread_cond_t to simulate + */ +typedef struct DnTelem { + Dnode * dnode; + bool enable; + pthread_mutex_t lock; + pthread_cond_t cond; + volatile int32_t exit; + pthread_t thread; + char email[TSDB_FQDN_LEN]; +} DnTelem; + +int32_t dnodeInitTelemetry(Dnode *dnode, DnTelem **telem); +void dnodeCleanupTelemetry(DnTelem **telem); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_TELEMETRY_H_*/ diff --git a/source/server/dnode/src/dnodeTelemetry.c b/source/server/dnode/src/dnodeTelemetry.c new file mode 100644 index 0000000000000000000000000000000000000000..8c2d4a431e9cd496b2ae2aa407faa2685bc98d57 --- /dev/null +++ b/source/server/dnode/src/dnodeTelemetry.c @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2020 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "osTime.h" +#include "tbuffer.h" +#include "tglobal.h" +#include "tsocket.h" +#include "dnodeCfg.h" +#include "dnodeTelemetry.h" +#include "mnode.h" + +#define TELEMETRY_SERVER "telemetry.taosdata.com" +#define TELEMETRY_PORT 80 +#define REPORT_INTERVAL 86400 + +static void dnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } + +static void dnodeCloseObject(SBufferWriter* bw) { + size_t len = tbufTell(bw); + if (tbufGetData(bw, false)[len - 1] == ',') { + tbufWriteCharAt(bw, len - 1, '}'); + } else { + tbufWriteChar(bw, '}'); + } + tbufWriteChar(bw, ','); +} + +#if 0 +static void beginArray(SBufferWriter* bw) { + tbufWriteChar(bw, '['); +} + +static void closeArray(SBufferWriter* bw) { + size_t len = tbufTell(bw); + if (tbufGetData(bw, false)[len - 1] == ',') { + tbufWriteCharAt(bw, len - 1, ']'); + } else { + tbufWriteChar(bw, ']'); + } + tbufWriteChar(bw, ','); +} +#endif + +static void dnodeWriteString(SBufferWriter* bw, const char* str) { + tbufWriteChar(bw, '"'); + tbufWrite(bw, str, strlen(str)); + tbufWriteChar(bw, '"'); +} + +static void dnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) { + dnodeWriteString(bw, k); + tbufWriteChar(bw, ':'); + char buf[32]; + sprintf(buf, "%" PRId64, v); + tbufWrite(bw, buf, strlen(buf)); + tbufWriteChar(bw, ','); +} + +static void dnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) { + dnodeWriteString(bw, k); + tbufWriteChar(bw, ':'); + dnodeWriteString(bw, v); + tbufWriteChar(bw, ','); +} + +static void dnodeAddCpuInfo(SBufferWriter* bw) { + char* line = NULL; + size_t size = 0; + int32_t done = 0; + + FILE* fp = fopen("/proc/cpuinfo", "r"); + if (fp == NULL) { + return; + } + + while (done != 3 && (size = tgetline(&line, &size, fp)) != -1) { + line[size - 1] = '\0'; + if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) { + const char* v = strchr(line, ':') + 2; + dnodeAddStringField(bw, "cpuModel", v); + done |= 1; + } else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) { + const char* v = strchr(line, ':') + 2; + dnodeWriteString(bw, "numOfCpu"); + tbufWriteChar(bw, ':'); + tbufWrite(bw, v, strlen(v)); + tbufWriteChar(bw, ','); + done |= 2; + } + } + + free(line); + fclose(fp); +} + +static void dnodeAddOsInfo(SBufferWriter* bw) { + char* line = NULL; + size_t size = 0; + + FILE* fp = fopen("/etc/os-release", "r"); + if (fp == NULL) { + return; + } + + while ((size = tgetline(&line, &size, fp)) != -1) { + line[size - 1] = '\0'; + if (strncmp(line, "PRETTY_NAME", 11) == 0) { + const char* p = strchr(line, '=') + 1; + if (*p == '"') { + p++; + line[size - 2] = 0; + } + dnodeAddStringField(bw, "os", p); + break; + } + } + + free(line); + fclose(fp); +} + +static void dnodeAddMemoryInfo(SBufferWriter* bw) { + char* line = NULL; + size_t size = 0; + + FILE* fp = fopen("/proc/meminfo", "r"); + if (fp == NULL) { + return; + } + + while ((size = tgetline(&line, &size, fp)) != -1) { + line[size - 1] = '\0'; + if (strncmp(line, "MemTotal", 8) == 0) { + const char* p = strchr(line, ':') + 1; + while (*p == ' ') p++; + dnodeAddStringField(bw, "memory", p); + break; + } + } + + free(line); + fclose(fp); +} + +static void dnodeAddVersionInfo(DnTelem* telem, SBufferWriter* bw) { + dnodeAddStringField(bw, "version", version); + dnodeAddStringField(bw, "buildInfo", buildinfo); + dnodeAddStringField(bw, "gitInfo", gitinfo); + dnodeAddStringField(bw, "email", telem->email); +} + +static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) { + SMnodeStat stat = {0}; + if (mnodeGetStatistics(telem->dnode->mnode, &stat) != 0) { + return; + } + + dnodeAddIntField(bw, "numOfDnode", stat.numOfDnode); + dnodeAddIntField(bw, "numOfMnode", stat.numOfMnode); + dnodeAddIntField(bw, "numOfVgroup", stat.numOfVgroup); + dnodeAddIntField(bw, "numOfDatabase", stat.numOfDatabase); + dnodeAddIntField(bw, "numOfSuperTable", stat.numOfSuperTable); + dnodeAddIntField(bw, "numOfChildTable", stat.numOfChildTable); + dnodeAddIntField(bw, "numOfColumn", stat.numOfColumn); + dnodeAddIntField(bw, "numOfPoint", stat.totalPoints); + dnodeAddIntField(bw, "totalStorage", stat.totalStorage); + dnodeAddIntField(bw, "compStorage", stat.compStorage); +} + +static void dnodeSendTelemetryReport(DnTelem* telem) { + char buf[128] = {0}; + uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); + if (ip == 0xffffffff) { + dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); + return; + } + SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); + if (fd < 0) { + dTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); + return; + } + + SBufferWriter bw = tbufInitWriter(NULL, false); + dnodeBeginObject(&bw); + dnodeAddStringField(&bw, "instanceId", telem->dnode->cfg->clusterId); + dnodeAddIntField(&bw, "reportVersion", 1); + dnodeAddOsInfo(&bw); + dnodeAddCpuInfo(&bw); + dnodeAddMemoryInfo(&bw); + dnodeAddVersionInfo(telem, &bw); + dnodeAddRuntimeInfo(telem, &bw); + dnodeCloseObject(&bw); + + const char* header = + "POST /report HTTP/1.1\n" + "Host: " TELEMETRY_SERVER + "\n" + "Content-Type: application/json\n" + "Content-Length: "; + + taosWriteSocket(fd, header, (int32_t)strlen(header)); + int32_t contLen = (int32_t)(tbufTell(&bw) - 1); + sprintf(buf, "%d\n\n", contLen); + taosWriteSocket(fd, buf, (int32_t)strlen(buf)); + taosWriteSocket(fd, tbufGetData(&bw, false), contLen); + tbufCloseWriter(&bw); + + // read something to avoid nginx error 499 + if (taosReadSocket(fd, buf, 10) < 0) { + dTrace("failed to receive response since %s", strerror(errno)); + } + + taosCloseSocket(fd); +} + +static void* dnodeTelemThreadFp(void* param) { + DnTelem* telem = param; + + struct timespec end = {0}; + clock_gettime(CLOCK_REALTIME, &end); + end.tv_sec += 300; // wait 5 minutes before send first report + + setThreadName("dnode-telem"); + + while (!telem->exit) { + int32_t r = 0; + struct timespec ts = end; + pthread_mutex_lock(&telem->lock); + r = pthread_cond_timedwait(&telem->cond, &telem->lock, &ts); + pthread_mutex_unlock(&telem->lock); + if (r == 0) break; + if (r != ETIMEDOUT) continue; + + if (mnodeIsServing(telem->dnode->mnode)) { + dnodeSendTelemetryReport(telem); + } + end.tv_sec += REPORT_INTERVAL; + } + + return NULL; +} + +static void dnodeGetEmail(DnTelem* telem, char* filepath) { + int32_t fd = open(filepath, O_RDONLY); + if (fd < 0) { + return; + } + + if (taosRead(fd, (void*)telem->email, TSDB_FQDN_LEN) < 0) { + dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); + } + + taosClose(fd); +} + +int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) { + DnTelem* telem = calloc(1, sizeof(DnTelem)); + if (telem == NULL) return -1; + + telem->dnode = dnode; + telem->enable = tsEnableTelemetryReporting; + *out = telem; + + if (!telem->enable) return 0; + + telem->exit = 0; + pthread_mutex_init(&telem->lock, NULL); + pthread_cond_init(&telem->cond, NULL); + telem->email[0] = 0; + + dnodeGetEmail(telem, "/usr/local/taos/email"); + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + int32_t code = pthread_create(&telem->thread, &attr, dnodeTelemThreadFp, telem); + pthread_attr_destroy(&attr); + if (code != 0) { + dTrace("failed to create telemetry thread since :%s", strerror(code)); + } + + dInfo("dnode telemetry is initialized"); + return 0; +} + +void dnodeCleanupTelemetry(DnTelem** out) { + DnTelem* telem = *out; + *out = NULL; + + if (!telem->enable) { + free(telem); + return; + } + + if (taosCheckPthreadValid(telem->thread)) { + pthread_mutex_lock(&telem->lock); + telem->exit = 1; + pthread_cond_signal(&telem->cond); + pthread_mutex_unlock(&telem->lock); + + pthread_join(telem->thread, NULL); + } + + pthread_mutex_destroy(&telem->lock); + pthread_cond_destroy(&telem->cond); + + free(telem); +}