tscSystem.c 11.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "taosmsg.h"
#include "tcache.h"
#include "tlog.h"
#include "trpc.h"
S
slguan 已提交
21
#include "taosdef.h"
H
hzcheng 已提交
22 23 24 25 26
#include "tsocket.h"
#include "tsystem.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
27
#include "tsched.h"
H
hzcheng 已提交
28
#include "tsclient.h"
S
slguan 已提交
29

H
hzcheng 已提交
30 31 32 33 34 35
// global, not configurable
void *  pVnodeConn;
void *  pVMeterConn;
void *  pTscMgmtConn;
void *  pSlaveConn;
void *  tscCacheHandle;
36
int32_t globalCode = 0;
H
hzcheng 已提交
37 38 39 40
int     initialized = 0;
int     slaveIndex;
void *  tscTmr;
void *  tscQhandle;
S
slguan 已提交
41
void *  tscCheckDiskUsageTmr;
H
hzcheng 已提交
42 43 44 45 46
int     tsInsertHeadSize;

extern int            tscEmbedded;
int                   tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
S
slguan 已提交
47
static pthread_mutex_t tscMutex;
H
hzcheng 已提交
48

L
lihui 已提交
49 50 51
extern int  tsTscEnableRecordSql;
extern int  tsNumOfLogLines;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
L
lihui 已提交
52
void deltaToUtcInitOnce();
L
lihui 已提交
53

S
slguan 已提交
54 55 56 57 58
void tscCheckDiskUsage(void *para, void *unused) {
  taosGetDisk();
  taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
}

S
slguan 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
int32_t tscInitRpc(const char *user, const char *secret) {
  SRpcInit rpcInit;
  char secretEncrypt[32] = {0};
  taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);

  pthread_mutex_lock(&tscMutex);
  if (pVnodeConn == NULL) {
    memset(&rpcInit, 0, sizeof(rpcInit));
    rpcInit.localIp = tsLocalIp;
    rpcInit.localPort = 0;
    rpcInit.label = "TSC-vnode";
    rpcInit.numOfThreads = tscNumOfThreads;
    rpcInit.cfp = tscProcessMsgFromServer;
    rpcInit.sessions = tsMaxVnodeConnections;
    rpcInit.connType = TAOS_CONN_CLIENT;
74
    rpcInit.user = (char*)user;
S
slguan 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
    rpcInit.ckey = "key";
    rpcInit.secret = secretEncrypt;

    pVnodeConn = rpcOpen(&rpcInit);
    if (pVnodeConn == NULL) {
      tscError("failed to init connection to vnode");
      pthread_mutex_unlock(&tscMutex);
      return -1;
    }
  }

  if (pTscMgmtConn == NULL) {
    memset(&rpcInit, 0, sizeof(rpcInit));
    rpcInit.localIp = tsLocalIp;
    rpcInit.localPort = 0;
    rpcInit.label = "TSC-mgmt";
    rpcInit.numOfThreads = 1;
    rpcInit.cfp = tscProcessMsgFromServer;
    rpcInit.sessions = tsMaxMgmtConnections;
    rpcInit.connType = TAOS_CONN_CLIENT;
    rpcInit.idleTime = 2000;
    rpcInit.user = "root";
    rpcInit.ckey = "key";
    rpcInit.secret = secretEncrypt;

    pTscMgmtConn = rpcOpen(&rpcInit);
    if (pTscMgmtConn == NULL) {
      tscError("failed to init connection to mgmt");
      pthread_mutex_unlock(&tscMutex);
      return -1;
    }
  }

  pthread_mutex_unlock(&tscMutex);
  return 0;
}

H
hzcheng 已提交
112 113 114 115
void taos_init_imp() {
  char        temp[128];
  struct stat dirstat;

S
slguan 已提交
116
  pthread_mutex_init(&tscMutex, NULL);
H
hzcheng 已提交
117
  srand(taosGetTimestampSec());
L
lihui 已提交
118
  deltaToUtcInitOnce();
H
hzcheng 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135

  if (tscEmbedded == 0) {
    /*
     * set localIp = 0
     * means unset tsLocalIp in client
     * except read from config file
     */
    strcpy(tsLocalIp, "0.0.0.0");

    // Read global configuration.
    tsReadGlobalLogConfig();

    // For log directory
    if (stat(logDir, &dirstat) < 0) mkdir(logDir, 0755);

    sprintf(temp, "%s/taoslog", logDir);
    if (taosInitLog(temp, tsNumOfLogLines, 10) < 0) {
S
slguan 已提交
136
      printf("failed to open log file in directory:%s\n", logDir);
H
hzcheng 已提交
137 138 139 140 141 142 143 144 145
    }

    tsReadGlobalConfig();
    tsPrintGlobalConfig();

    tscTrace("starting to initialize TAOS client ...");
    tscTrace("Local IP address is:%s", tsLocalIp);
  }

L
lihui 已提交
146 147
  taosSetCoreDump();

L
lihui 已提交
148 149 150
  if (tsTscEnableRecordSql != 0) {
    taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
  }
S
slguan 已提交
151

S
slguan 已提交
152
  tscMgmtIpList.inUse = 0;
S
slguan 已提交
153
  tscMgmtIpList.port = tsMnodeShellPort;
S
slguan 已提交
154 155
  tscMgmtIpList.numOfIps = 1;
  tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
S
slguan 已提交
156

S
slguan 已提交
157
  if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
S
slguan 已提交
158 159
    tscMgmtIpList.numOfIps = 2;
    tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
S
slguan 已提交
160 161
  }

H
hzcheng 已提交
162 163 164 165 166 167 168 169 170 171 172 173 174
  tscInitMsgs();
  slaveIndex = rand();
  int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;

  if (tscEmbedded == 0) {
    tscNumOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
  } else {
    tscNumOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0;
  }

  if (tscNumOfThreads < 2) tscNumOfThreads = 2;

  tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
S
slguan 已提交
175 176 177 178
  if (NULL == tscQhandle) {
    tscError("failed to init scheduler");
    return;
  }
H
hzcheng 已提交
179 180

  tscTmr = taosTmrInit(tsMaxMgmtConnections * 2, 200, 60000, "TSC");
S
slguan 已提交
181 182
  if(0 == tscEmbedded){
    taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);      
S
slguan 已提交
183
  }
H
hzcheng 已提交
184 185 186 187
  int64_t refreshTime = tsMetricMetaKeepTimer < tsMeterMetaKeepTimer ? tsMetricMetaKeepTimer : tsMeterMetaKeepTimer;
  refreshTime = refreshTime > 2 ? 2 : refreshTime;
  refreshTime = refreshTime < 1 ? 1 : refreshTime;

188
  if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
H
hzcheng 已提交
189 190

  initialized = 1;
S
slguan 已提交
191
  tscTrace("client is initialized successfully");
H
hzcheng 已提交
192 193 194 195 196
  tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
}

void taos_init() { pthread_once(&tscinit, taos_init_imp); }

weixin_48148422's avatar
weixin_48148422 已提交
197 198
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
  SGlobalConfig *cfg = NULL;
H
hzcheng 已提交
199 200 201

  switch (option) {
    case TSDB_OPTION_CONFIGDIR:
weixin_48148422's avatar
weixin_48148422 已提交
202
      cfg = tsGetConfigOption("configDir");
H
hjxilinx 已提交
203 204 205
      assert(cfg != NULL);
    
      if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
H
hzcheng 已提交
206
        strncpy(configDir, pStr, TSDB_FILENAME_LEN);
weixin_48148422's avatar
weixin_48148422 已提交
207
        cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
H
hzcheng 已提交
208 209
        tscPrint("set config file directory:%s", pStr);
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
210 211
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
212 213
      }
      break;
S
slguan 已提交
214

H
hzcheng 已提交
215
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
weixin_48148422's avatar
weixin_48148422 已提交
216
      cfg = tsGetConfigOption("shellActivityTimer");
H
hjxilinx 已提交
217 218 219
      assert(cfg != NULL);
    
      if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
weixin_48148422's avatar
weixin_48148422 已提交
220
        tsShellActivityTimer = atoi(pStr);
H
hzcheng 已提交
221 222
        if (tsShellActivityTimer < 1) tsShellActivityTimer = 1;
        if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600;
weixin_48148422's avatar
weixin_48148422 已提交
223
        cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
H
hzcheng 已提交
224 225
        tscPrint("set shellActivityTimer:%d", tsShellActivityTimer);
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
226 227
        tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (int32_t *)cfg->ptr);
H
hzcheng 已提交
228 229
      }
      break;
S
slguan 已提交
230

H
hzcheng 已提交
231
    case TSDB_OPTION_LOCALE: {  // set locale
weixin_48148422's avatar
weixin_48148422 已提交
232
      cfg = tsGetConfigOption("locale");
H
hjxilinx 已提交
233 234
      assert(cfg != NULL);
  
H
hzcheng 已提交
235 236 237 238 239 240
      size_t len = strlen(pStr);
      if (len == 0 || len > TSDB_LOCALE_LEN) {
        tscPrint("Invalid locale:%s, use default", pStr);
        return -1;
      }

H
hjxilinx 已提交
241
      if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
H
hzcheng 已提交
242 243
        char sep = '.';

S
slguan 已提交
244 245 246 247 248 249
        if (strlen(tsLocale) == 0) { // locale does not set yet
          char* defaultLocale = setlocale(LC_CTYPE, "");
          strcpy(tsLocale, defaultLocale);
        }

        // set the user specified locale
H
hzcheng 已提交
250 251 252
        char *locale = setlocale(LC_CTYPE, pStr);

        if (locale != NULL) {
S
slguan 已提交
253
          tscPrint("locale set, prev locale:%s, new locale:%s", tsLocale, locale);
weixin_48148422's avatar
weixin_48148422 已提交
254
          cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
S
slguan 已提交
255
        } else { // set the user-specified localed failed, use default LC_CTYPE as current locale
S
slguan 已提交
256 257
          locale = setlocale(LC_CTYPE, tsLocale);
          tscPrint("failed to set locale:%s, current locale:%s", pStr, tsLocale);
H
hzcheng 已提交
258 259
        }

S
slguan 已提交
260
        strncpy(tsLocale, locale, tListLen(tsLocale));
H
hzcheng 已提交
261 262 263 264 265 266 267 268

        char *charset = strrchr(tsLocale, sep);
        if (charset != NULL) {
          charset += 1;

          charset = taosCharsetReplace(charset);

          if (taosValidateEncodec(charset)) {
S
slguan 已提交
269 270 271 272 273 274
            if (strlen(tsCharset) == 0) {
              tscPrint("charset set:%s", charset);
            } else {
              tscPrint("charset changed from %s to %s", tsCharset, charset);
            }

H
hzcheng 已提交
275
            strncpy(tsCharset, charset, tListLen(tsCharset));
weixin_48148422's avatar
weixin_48148422 已提交
276
            cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
S
slguan 已提交
277

H
hzcheng 已提交
278 279 280
          } else {
            tscPrint("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset);
          }
S
slguan 已提交
281

H
hzcheng 已提交
282
          free(charset);
S
slguan 已提交
283
        } else { // it may be windows system
H
hzcheng 已提交
284 285 286
          tscPrint("charset remains:%s", tsCharset);
        }
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
287 288
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
289 290 291 292 293 294
      }
      break;
    }

    case TSDB_OPTION_CHARSET: {
      /* set charset will override the value of charset, assigned during system locale changed */
weixin_48148422's avatar
weixin_48148422 已提交
295
      cfg = tsGetConfigOption("charset");
H
hjxilinx 已提交
296 297
      assert(cfg != NULL);
      
H
hzcheng 已提交
298 299
      size_t len = strlen(pStr);
      if (len == 0 || len > TSDB_LOCALE_LEN) {
S
slguan 已提交
300
        tscPrint("failed to set charset:%s", pStr);
H
hzcheng 已提交
301 302 303
        return -1;
      }

H
hjxilinx 已提交
304
      if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
H
hzcheng 已提交
305
        if (taosValidateEncodec(pStr)) {
S
slguan 已提交
306 307 308 309 310 311
          if (strlen(tsCharset) == 0) {
            tscPrint("charset is set:%s", pStr);
          } else {
            tscPrint("charset changed from %s to %s", tsCharset, pStr);
          }

H
hzcheng 已提交
312
          strncpy(tsCharset, pStr, tListLen(tsCharset));
weixin_48148422's avatar
weixin_48148422 已提交
313
          cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
H
hzcheng 已提交
314
        } else {
S
slguan 已提交
315
          tscPrint("charset:%s not valid", pStr);
H
hzcheng 已提交
316 317
        }
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
318 319
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
320 321 322 323 324 325
      }

      break;
    }

    case TSDB_OPTION_TIMEZONE:
weixin_48148422's avatar
weixin_48148422 已提交
326
      cfg = tsGetConfigOption("timezone");
H
hjxilinx 已提交
327 328 329
      assert(cfg != NULL);
    
      if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
H
hzcheng 已提交
330 331
        strcpy(tsTimezone, pStr);
        tsSetTimeZone();
weixin_48148422's avatar
weixin_48148422 已提交
332
        cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
H
hzcheng 已提交
333 334
        tscTrace("timezone set:%s, input:%s by taos_options", tsTimezone, pStr);
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
335 336
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
337 338
      }
      break;
339 340

    case TSDB_OPTION_SOCKET_TYPE:
weixin_48148422's avatar
weixin_48148422 已提交
341
      cfg = tsGetConfigOption("sockettype");
H
hjxilinx 已提交
342 343 344
      assert(cfg != NULL);
    
      if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
S
slguan 已提交
345 346 347 348
//        if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
//          tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
//          return -1;
//        }
349

weixin_48148422's avatar
weixin_48148422 已提交
350 351
        strncpy(tsSocketType, pStr, tListLen(tsSocketType));
        cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
352 353 354 355
        tscPrint("socket type is set:%s", tsSocketType);
      }
      break;

H
hzcheng 已提交
356
    default:
H
hjxilinx 已提交
357
      // TODO return the correct error code to client in the format for taos_errstr()
H
hzcheng 已提交
358 359 360 361 362 363
      tscError("Invalid option %d", option);
      return -1;
  }

  return 0;
}
weixin_48148422's avatar
weixin_48148422 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379


int taos_options(TSDB_OPTION option, const void *arg, ...) {
  static int32_t lock = 0;

  for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
    if (i % 1000 == 0) {
      tscPrint("haven't acquire lock after spin %d times.", i);
      sched_yield();
    }
  }

  int ret = taos_options_imp(option, (const char*)arg);

  atomic_store_32(&lock, 0);
  return ret;
L
lihui 已提交
380
}