mnodeTelem.c 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2020 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "mnodeTelem.h"
18 19 20 21 22 23 24
#include "tbuffer.h"
#include "tglobal.h"

#define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80
#define REPORT_INTERVAL 86400

25 26 27 28 29 30 31 32 33 34 35 36 37
/*
 * sem_timedwait is NOT implemented on MacOSX
 * thus we use pthread_mutex_t/pthread_cond_t to simulate
 */
static struct {
  bool             enable;
  pthread_mutex_t  lock;
  pthread_cond_t   cond;
  volatile int32_t exit;
  pthread_t        thread;
  char             email[TSDB_FQDN_LEN];
} tsTelem;

38
static void mnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); }
39

40
static void mnodeCloseObject(SBufferWriter* bw) {
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
  size_t len = tbufTell(bw);
  if (tbufGetData(bw, false)[len - 1] == ',') {
    tbufWriteCharAt(bw, len - 1, '}');
  } else {
    tbufWriteChar(bw, '}');
  }
  tbufWriteChar(bw, ',');
}

#if 0
static void beginArray(SBufferWriter* bw) {
  tbufWriteChar(bw, '[');
}

static void closeArray(SBufferWriter* bw) {
  size_t len = tbufTell(bw);
  if (tbufGetData(bw, false)[len - 1] == ',') {
    tbufWriteCharAt(bw, len - 1, ']');
  } else {
    tbufWriteChar(bw, ']');
  }
  tbufWriteChar(bw, ',');
}
#endif

66
static void mnodeWriteString(SBufferWriter* bw, const char* str) {
67 68 69 70 71
  tbufWriteChar(bw, '"');
  tbufWrite(bw, str, strlen(str));
  tbufWriteChar(bw, '"');
}

72 73
static void mnodeAddIntField(SBufferWriter* bw, const char* k, int64_t v) {
  mnodeWriteString(bw, k);
74 75 76 77 78 79 80
  tbufWriteChar(bw, ':');
  char buf[32];
  sprintf(buf, "%" PRId64, v);
  tbufWrite(bw, buf, strlen(buf));
  tbufWriteChar(bw, ',');
}

81 82
static void mnodeAddStringField(SBufferWriter* bw, const char* k, const char* v) {
  mnodeWriteString(bw, k);
83
  tbufWriteChar(bw, ':');
84
  mnodeWriteString(bw, v);
85 86 87
  tbufWriteChar(bw, ',');
}

88
static void mnodeAddCpuInfo(SBufferWriter* bw) {
89 90 91 92 93 94 95 96 97 98 99 100 101
  char*   line = NULL;
  size_t  size = 0;
  int32_t done = 0;

  FILE* fp = fopen("/proc/cpuinfo", "r");
  if (fp == NULL) {
    return;
  }

  while (done != 3 && (size = tgetline(&line, &size, fp)) != -1) {
    line[size - 1] = '\0';
    if (((done & 1) == 0) && strncmp(line, "model name", 10) == 0) {
      const char* v = strchr(line, ':') + 2;
102
      mnodeAddStringField(bw, "cpuModel", v);
103 104 105
      done |= 1;
    } else if (((done & 2) == 0) && strncmp(line, "cpu cores", 9) == 0) {
      const char* v = strchr(line, ':') + 2;
106
      mnodeWriteString(bw, "numOfCpu");
107 108 109 110 111 112 113 114 115 116 117
      tbufWriteChar(bw, ':');
      tbufWrite(bw, v, strlen(v));
      tbufWriteChar(bw, ',');
      done |= 2;
    }
  }

  free(line);
  fclose(fp);
}

118
static void mnodeAddOsInfo(SBufferWriter* bw) {
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
  char*  line = NULL;
  size_t size = 0;

  FILE* fp = fopen("/etc/os-release", "r");
  if (fp == NULL) {
    return;
  }

  while ((size = tgetline(&line, &size, fp)) != -1) {
    line[size - 1] = '\0';
    if (strncmp(line, "PRETTY_NAME", 11) == 0) {
      const char* p = strchr(line, '=') + 1;
      if (*p == '"') {
        p++;
        line[size - 2] = 0;
      }
135
      mnodeAddStringField(bw, "os", p);
136 137 138 139 140 141 142 143
      break;
    }
  }

  free(line);
  fclose(fp);
}

144
static void mnodeAddMemoryInfo(SBufferWriter* bw) {
145 146 147 148 149 150 151 152 153 154 155 156 157
  char*  line = NULL;
  size_t size = 0;

  FILE* fp = fopen("/proc/meminfo", "r");
  if (fp == NULL) {
    return;
  }

  while ((size = tgetline(&line, &size, fp)) != -1) {
    line[size - 1] = '\0';
    if (strncmp(line, "MemTotal", 8) == 0) {
      const char* p = strchr(line, ':') + 1;
      while (*p == ' ') p++;
158
      mnodeAddStringField(bw, "memory", p);
159 160 161 162 163 164 165 166
      break;
    }
  }

  free(line);
  fclose(fp);
}

167 168 169 170 171
static void mnodeAddVersionInfo(SBufferWriter* bw) {
  mnodeAddStringField(bw, "version", version);
  mnodeAddStringField(bw, "buildInfo", buildinfo);
  mnodeAddStringField(bw, "gitInfo", gitinfo);
  mnodeAddStringField(bw, "email", tsTelem.email);
172 173
}

174
static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
S
Shengliang Guan 已提交
175 176
  SMnodeLoad load = {0};
  if (mnodeGetLoad(&load) != 0) {
177 178 179
    return;
  }

S
Shengliang Guan 已提交
180 181 182 183 184 185 186 187 188 189
  mnodeAddIntField(bw, "numOfDnode", load.numOfDnode);
  mnodeAddIntField(bw, "numOfMnode", load.numOfMnode);
  mnodeAddIntField(bw, "numOfVgroup", load.numOfVgroup);
  mnodeAddIntField(bw, "numOfDatabase", load.numOfDatabase);
  mnodeAddIntField(bw, "numOfSuperTable", load.numOfSuperTable);
  mnodeAddIntField(bw, "numOfChildTable", load.numOfChildTable);
  mnodeAddIntField(bw, "numOfColumn", load.numOfColumn);
  mnodeAddIntField(bw, "numOfPoint", load.totalPoints);
  mnodeAddIntField(bw, "totalStorage", load.totalStorage);
  mnodeAddIntField(bw, "compStorage", load.compStorage);
190 191
}

192
static void mnodeSendTelemetryReport() {
193 194 195
  char     buf[128] = {0};
  uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
  if (ip == 0xffffffff) {
196
    mTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno));
197 198 199 200
    return;
  }
  SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0);
  if (fd < 0) {
201
    mTrace("failed to create socket for telemetry, reason:%s", strerror(errno));
202 203 204
    return;
  }

S
Shengliang Guan 已提交
205 206 207
  int64_t clusterId = mnodeGetClusterId();
  char    clusterIdStr[20] = {0};
  snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
208

209
  SBufferWriter bw = tbufInitWriter(NULL, false);
210
  mnodeBeginObject(&bw);
S
Shengliang Guan 已提交
211
  mnodeAddStringField(&bw, "instanceId", clusterIdStr);
212 213 214 215 216 217 218
  mnodeAddIntField(&bw, "reportVersion", 1);
  mnodeAddOsInfo(&bw);
  mnodeAddCpuInfo(&bw);
  mnodeAddMemoryInfo(&bw);
  mnodeAddVersionInfo(&bw);
  mnodeAddRuntimeInfo(&bw);
  mnodeCloseObject(&bw);
219 220 221 222 223 224 225 226

  const char* header =
      "POST /report HTTP/1.1\n"
      "Host: " TELEMETRY_SERVER
      "\n"
      "Content-Type: application/json\n"
      "Content-Length: ";

S
Shengliang Guan 已提交
227
  taosWriteSocket(fd, (void*)header, (int32_t)strlen(header));
228 229 230 231 232 233 234 235
  int32_t contLen = (int32_t)(tbufTell(&bw) - 1);
  sprintf(buf, "%d\n\n", contLen);
  taosWriteSocket(fd, buf, (int32_t)strlen(buf));
  taosWriteSocket(fd, tbufGetData(&bw, false), contLen);
  tbufCloseWriter(&bw);

  // read something to avoid nginx error 499
  if (taosReadSocket(fd, buf, 10) < 0) {
236
    mTrace("failed to receive response since %s", strerror(errno));
237 238 239 240 241
  }

  taosCloseSocket(fd);
}

242
static void* mnodeTelemThreadFp(void* param) {
243 244 245 246
  struct timespec end = {0};
  clock_gettime(CLOCK_REALTIME, &end);
  end.tv_sec += 300;  // wait 5 minutes before send first report

247
  setThreadName("mnode-telem");
248

249
  while (!tsTelem.exit) {
250 251
    int32_t         r = 0;
    struct timespec ts = end;
252 253 254
    pthread_mutex_lock(&tsTelem.lock);
    r = pthread_cond_timedwait(&tsTelem.cond, &tsTelem.lock, &ts);
    pthread_mutex_unlock(&tsTelem.lock);
255 256 257
    if (r == 0) break;
    if (r != ETIMEDOUT) continue;

S
Shengliang Guan 已提交
258
    if (mnodeGetStatus() == MN_STATUS_READY) {
259
      mnodeSendTelemetryReport();
260 261 262 263 264 265 266
    }
    end.tv_sec += REPORT_INTERVAL;
  }

  return NULL;
}

267
static void mnodeGetEmail(char* filepath) {
S
Shengliang Guan 已提交
268
  int32_t fd = taosOpenFileRead(filepath);
269 270 271 272
  if (fd < 0) {
    return;
  }

273
  if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) {
274
    mError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno));
275 276
  }

S
Shengliang Guan 已提交
277
  taosCloseFile(fd);
278 279
}

280
int32_t mnodeInitTelem() {
281 282
  tsTelem.enable = tsEnableTelemetryReporting;
  if (!tsTelem.enable) return 0;
283

284 285 286 287
  tsTelem.exit = 0;
  pthread_mutex_init(&tsTelem.lock, NULL);
  pthread_cond_init(&tsTelem.cond, NULL);
  tsTelem.email[0] = 0;
288

289
  mnodeGetEmail("/usr/local/taos/email");
290 291 292 293 294

  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

295
  int32_t code = pthread_create(&tsTelem.thread, &attr, mnodeTelemThreadFp, NULL);
296 297
  pthread_attr_destroy(&attr);
  if (code != 0) {
298
    mTrace("failed to create telemetry thread since :%s", strerror(code));
299 300
  }

301
  mInfo("mnode telemetry is initialized");
302 303 304
  return 0;
}

305
void mnodeCleanupTelem() {
306
  if (!tsTelem.enable) return;
307

308 309 310 311 312
  if (taosCheckPthreadValid(tsTelem.thread)) {
    pthread_mutex_lock(&tsTelem.lock);
    tsTelem.exit = 1;
    pthread_cond_signal(&tsTelem.cond);
    pthread_mutex_unlock(&tsTelem.lock);
313

314
    pthread_join(tsTelem.thread, NULL);
315 316
  }

317 318
  pthread_mutex_destroy(&tsTelem.lock);
  pthread_cond_destroy(&tsTelem.cond);
319
}