mnodeTelem.c 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "mnodeTelem.h"
18 19
#include "tbuffer.h"
#include "tglobal.h"
S
Shengliang Guan 已提交
20
#include "mnodeSync.h"
21 22 23 24 25

#define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80
#define REPORT_INTERVAL 86400

26 27 28 29 30 31 32 33 34 35 36 37 38
/*
 * sem_timedwait is NOT implemented on MacOSX
 * thus we use pthread_mutex_t/pthread_cond_t to simulate
 */
static struct {
  bool             enable;
  pthread_mutex_t  lock;
  pthread_cond_t   cond;
  volatile int32_t exit;
  pthread_t        thread;
  char             email[TSDB_FQDN_LEN];
} tsTelem;

39
static void mnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
40

41
static void mnodeCloseObject(SBufferWriter* bw) {
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
  size_t len = tbufTell(bw);
  if (tbufGetData(bw, false)[len - 1] == ',') {
    tbufWriteCharAt(bw, len - 1, '}');
  } else {
    tbufWriteChar(bw, '}');
  }
  tbufWriteChar(bw, ',');
}

#if 0
static void beginArray(SBufferWriter* bw) {
  tbufWriteChar(bw, '[');
}

static void closeArray(SBufferWriter* bw) {
  size_t len = tbufTell(bw);
  if (tbufGetData(bw, false)[len - 1] == ',') {
    tbufWriteCharAt(bw, len - 1, ']');
  } else {
    tbufWriteChar(bw, ']');
  }
  tbufWriteChar(bw, ',');
}
#endif

67
static void mnodeWriteString(SBufferWriter* bw, const char* str) {
68 69 70 71 72
  tbufWriteChar(bw, '"');
  tbufWrite(bw, str, strlen(str));
  tbufWriteChar(bw, '"');
}

73 74
static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
  mnodeWriteString(bw, k);
75 76 77 78 79 80 81
  tbufWriteChar(bw, ':');
  char buf[32];
  sprintf(buf, "%" PRId64, v);
  tbufWrite(bw, buf, strlen(buf));
  tbufWriteChar(bw, ',');
}

82 83
static void mnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) {
  mnodeWriteString(bw, k);
84
  tbufWriteChar(bw, ':');
85
  mnodeWriteString(bw, v);
86 87 88
  tbufWriteChar(bw, ',');
}

89
static void mnodeAddCpuInfo(SBufferWriter* bw) {
90 91 92 93 94 95 96 97 98 99 100 101 102
  char*   line = NULL;
  size_t  size = 0;
  int32_t done = 0;

  FILE* fp = fopen("/proc/cpuinfo", "r");
  if (fp == NULL) {
    return;
  }

  while (done != 3 && (size = tgetline(&line, &size, fp)) != -1) {
    line[size - 1] = '\0';
    if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) {
      const char* v = strchr(line, ':') + 2;
103
      mnodeAddStringField(bw, "cpuModel", v);
104 105 106
      done |= 1;
    } else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) {
      const char* v = strchr(line, ':') + 2;
107
      mnodeWriteString(bw, "numOfCpu");
108 109 110 111 112 113 114 115 116 117 118
      tbufWriteChar(bw, ':');
      tbufWrite(bw, v, strlen(v));
      tbufWriteChar(bw, ',');
      done |= 2;
    }
  }

  free(line);
  fclose(fp);
}

119
static void mnodeAddOsInfo(SBufferWriter* bw) {
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
  char*  line = NULL;
  size_t size = 0;

  FILE* fp = fopen("/etc/os-release", "r");
  if (fp == NULL) {
    return;
  }

  while ((size = tgetline(&line, &size, fp)) != -1) {
    line[size - 1] = '\0';
    if (strncmp(line, "PRETTY_NAME", 11) == 0) {
      const char* p = strchr(line, '=') + 1;
      if (*p == '"') {
        p++;
        line[size - 2] = 0;
      }
136
      mnodeAddStringField(bw, "os", p);
137 138 139 140 141 142 143 144
      break;
    }
  }

  free(line);
  fclose(fp);
}

145
static void mnodeAddMemoryInfo(SBufferWriter* bw) {
146 147 148 149 150 151 152 153 154 155 156 157 158
  char*  line = NULL;
  size_t size = 0;

  FILE* fp = fopen("/proc/meminfo", "r");
  if (fp == NULL) {
    return;
  }

  while ((size = tgetline(&line, &size, fp)) != -1) {
    line[size - 1] = '\0';
    if (strncmp(line, "MemTotal", 8) == 0) {
      const char* p = strchr(line, ':') + 1;
      while (*p == ' ') p++;
159
      mnodeAddStringField(bw, "memory", p);
160 161 162 163 164 165 166 167
      break;
    }
  }

  free(line);
  fclose(fp);
}

168 169 170 171 172
static void mnodeAddVersionInfo(SBufferWriter* bw) {
  mnodeAddStringField(bw, "version", version);
  mnodeAddStringField(bw, "buildInfo", buildinfo);
  mnodeAddStringField(bw, "gitInfo", gitinfo);
  mnodeAddStringField(bw, "email", tsTelem.email);
173 174
}

175
static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
S
Shengliang Guan 已提交
176 177
  SMnodeLoad load = {0};
  if (mnodeGetLoad(&load) != 0) {
178 179 180
    return;
  }

S
Shengliang Guan 已提交
181 182 183 184 185 186 187 188 189 190
  mnodeAddIntField(bw, "numOfDnode", load.numOfDnode);
  mnodeAddIntField(bw, "numOfMnode", load.numOfMnode);
  mnodeAddIntField(bw, "numOfVgroup", load.numOfVgroup);
  mnodeAddIntField(bw, "numOfDatabase", load.numOfDatabase);
  mnodeAddIntField(bw, "numOfSuperTable", load.numOfSuperTable);
  mnodeAddIntField(bw, "numOfChildTable", load.numOfChildTable);
  mnodeAddIntField(bw, "numOfColumn", load.numOfColumn);
  mnodeAddIntField(bw, "numOfPoint", load.totalPoints);
  mnodeAddIntField(bw, "totalStorage", load.totalStorage);
  mnodeAddIntField(bw, "compStorage", load.compStorage);
191 192
}

193
static void mnodeSendTelemetryReport() {
194 195 196
  char     buf[128] = {0};
  uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
  if (ip == 0xffffffff) {
197
    mTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno));
198 199 200 201
    return;
  }
  SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
  if (fd < 0) {
202
    mTrace("failed to create socket for telemetry, reason:%s", strerror(errno));
203 204 205
    return;
  }

S
Shengliang Guan 已提交
206 207 208
  int64_t clusterId = mnodeGetClusterId();
  char    clusterIdStr[20] = {0};
  snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
209

210
  SBufferWriter bw = tbufInitWriter(NULL, false);
211
  mnodeBeginObject(&bw);
S
Shengliang Guan 已提交
212
  mnodeAddStringField(&bw, "instanceId", clusterIdStr);
213 214 215 216 217 218 219
  mnodeAddIntField(&bw, "reportVersion", 1);
  mnodeAddOsInfo(&bw);
  mnodeAddCpuInfo(&bw);
  mnodeAddMemoryInfo(&bw);
  mnodeAddVersionInfo(&bw);
  mnodeAddRuntimeInfo(&bw);
  mnodeCloseObject(&bw);
220 221 222 223 224 225 226 227

  const char* header =
      "POST /report HTTP/1.1\n"
      "Host: " TELEMETRY_SERVER
      "\n"
      "Content-Type: application/json\n"
      "Content-Length: ";

S
Shengliang Guan 已提交
228
  taosWriteSocket(fd, (void*)header, (int32_t)strlen(header));
229 230 231 232 233 234 235 236
  int32_t contLen = (int32_t)(tbufTell(&bw) - 1);
  sprintf(buf, "%d\n\n", contLen);
  taosWriteSocket(fd, buf, (int32_t)strlen(buf));
  taosWriteSocket(fd, tbufGetData(&bw, false), contLen);
  tbufCloseWriter(&bw);

  // read something to avoid nginx error 499
  if (taosReadSocket(fd, buf, 10) < 0) {
237
    mTrace("failed to receive response since %s", strerror(errno));
238 239 240 241 242
  }

  taosCloseSocket(fd);
}

243
static void* mnodeTelemThreadFp(void* param) {
244 245 246 247
  struct timespec end = {0};
  clock_gettime(CLOCK_REALTIME, &end);
  end.tv_sec += 300;  // wait 5 minutes before send first report

248
  setThreadName("mnode-telem");
249

250
  while (!tsTelem.exit) {
251 252
    int32_t         r = 0;
    struct timespec ts = end;
253 254 255
    pthread_mutex_lock(&tsTelem.lock);
    r = pthread_cond_timedwait(&tsTelem.cond, &tsTelem.lock, &ts);
    pthread_mutex_unlock(&tsTelem.lock);
256 257 258
    if (r == 0) break;
    if (r != ETIMEDOUT) continue;

S
Shengliang Guan 已提交
259
    if (mnodeIsMaster()) {
260
      mnodeSendTelemetryReport();
261 262 263 264 265 266 267
    }
    end.tv_sec += REPORT_INTERVAL;
  }

  return NULL;
}

268
static void mnodeGetEmail(char* filepath) {
S
Shengliang Guan 已提交
269
  int32_t fd = taosOpenFileRead(filepath);
270 271 272 273
  if (fd < 0) {
    return;
  }

274
  if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) {
275
    mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
276 277
  }

S
Shengliang Guan 已提交
278
  taosCloseFile(fd);
279 280
}

281
int32_t mnodeInitTelem() {
282 283
  tsTelem.enable = tsEnableTelemetryReporting;
  if (!tsTelem.enable) return 0;
284

285 286 287 288
  tsTelem.exit = 0;
  pthread_mutex_init(&tsTelem.lock, NULL);
  pthread_cond_init(&tsTelem.cond, NULL);
  tsTelem.email[0] = 0;
289

290
  mnodeGetEmail("/usr/local/taos/email");
291 292 293 294 295

  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

296
  int32_t code = pthread_create(&tsTelem.thread, &attr, mnodeTelemThreadFp, NULL);
297 298
  pthread_attr_destroy(&attr);
  if (code != 0) {
299
    mTrace("failed to create telemetry thread since :%s", strerror(code));
300 301
  }

302
  mInfo("mnode telemetry is initialized");
303 304 305
  return 0;
}

306
void mnodeCleanupTelem() {
307
  if (!tsTelem.enable) return;
308

309 310 311 312 313
  if (taosCheckPthreadValid(tsTelem.thread)) {
    pthread_mutex_lock(&tsTelem.lock);
    tsTelem.exit = 1;
    pthread_cond_signal(&tsTelem.cond);
    pthread_mutex_unlock(&tsTelem.lock);
314

315
    pthread_join(tsTelem.thread, NULL);
316 317
  }

318 319
  pthread_mutex_destroy(&tsTelem.lock);
  pthread_cond_destroy(&tsTelem.cond);
320
}