tscSystem.c 12.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17
#include "taosmsg.h"
18
#include "tref.h"
H
hzcheng 已提交
19
#include "trpc.h"
S
TD-2371  
Shengliang Guan 已提交
20
#include "tnote.h"
H
hzcheng 已提交
21 22
#include "ttimer.h"
#include "tutil.h"
S
slguan 已提交
23
#include "tsched.h"
S
slguan 已提交
24
#include "tscLog.h"
25
#include "tscUtil.h"
H
hzcheng 已提交
26
#include "tsclient.h"
S
slguan 已提交
27 28 29 30
#include "tglobal.h"
#include "tconfig.h"
#include "ttimezone.h"
#include "tlocale.h"
S
slguan 已提交
31

H
hzcheng 已提交
32
// global, not configurable
33 34
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED    0
H
hzcheng 已提交
35

36 37 38 39 40 41 42 43 44 45
int32_t    sentinel = TSC_VAR_NOT_RELEASE;

SHashObj  *tscVgroupMap;         // hash map to keep the global vgroup info
SHashObj  *tscTableMetaInfo;     // table meta info
int32_t    tscObjRef = -1;
void      *tscTmr;
void      *tscQhandle;
int32_t    tscRefId = -1;
int32_t    tscNumOfObj = 0;         // number of sqlObj in current process.
static void  *tscCheckDiskUsageTmr;
Y
yihaoDeng 已提交
46 47 48
void      *tscRpcCache;            // cache to keep rpc obj
int32_t   tscNumOfThreads = 1;     // num of rpc threads  
static    pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently 
49
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
L
lihui 已提交
50

S
TD-1413  
Shengliang Guan 已提交
51
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
S
slguan 已提交
52 53 54
  taosGetDisk();
  taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
}
Y
yihaoDeng 已提交
55 56 57
void tscFreeRpcObj(void *param) {
  assert(param);
  SRpcObj *pRpcObj = (SRpcObj *)(param);
Y
yihaoDeng 已提交
58
  tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn);
Y
yihaoDeng 已提交
59 60
  rpcClose(pRpcObj->pDnodeConn);
}
S
slguan 已提交
61

Y
yihaoDeng 已提交
62 63 64
void tscReleaseRpc(void *param)  {
  if (param == NULL) {
    return;
S
slguan 已提交
65
  }
Y
yihaoDeng 已提交
66
  pthread_mutex_lock(&rpcObjMutex);
Y
yihaoDeng 已提交
67
  taosCacheRelease(tscRpcCache, (void *)&param, false); 
Y
yihaoDeng 已提交
68 69
  pthread_mutex_unlock(&rpcObjMutex);
} 
S
slguan 已提交
70

Y
yihaoDeng 已提交
71
int32_t tscAcquireRpc(const char *key, const char *user, const char *secretEncrypt, void **ppRpcObj) {
Y
yihaoDeng 已提交
72 73
  pthread_mutex_lock(&rpcObjMutex);

Y
yihaoDeng 已提交
74
  SRpcObj *pRpcObj = (SRpcObj *)taosCacheAcquireByKey(tscRpcCache, key, strlen(key));
Y
yihaoDeng 已提交
75 76 77 78 79
  if (pRpcObj != NULL) {
    *ppRpcObj = pRpcObj;   
    pthread_mutex_unlock(&rpcObjMutex);
    return 0;
  } 
S
slguan 已提交
80

Y
yihaoDeng 已提交
81 82 83 84
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
  rpcInit.label = "TSC";
Y
yihaoDeng 已提交
85
  rpcInit.numOfThreads = tscNumOfThreads * 2;    
Y
yihaoDeng 已提交
86 87 88 89
  rpcInit.cfp = tscProcessMsgFromServer;
  rpcInit.sessions = tsMaxConnections;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.user = (char *)user;
Y
yihaoDeng 已提交
90
  rpcInit.idleTime = tsShellActivityTimer * 1000;; 
Y
yihaoDeng 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103
  rpcInit.ckey = "key"; 
  rpcInit.spi = 1; 
  rpcInit.secret = (char *)secretEncrypt;

  SRpcObj rpcObj;
  memset(&rpcObj, 0, sizeof(rpcObj));
  strncpy(rpcObj.key, key, strlen(key));
  rpcObj.pDnodeConn = rpcOpen(&rpcInit);
  if (rpcObj.pDnodeConn == NULL) {
    pthread_mutex_unlock(&rpcObjMutex);
    tscError("failed to init connection to TDengine");
    return -1;
  } 
Y
fix bug  
yihaoDeng 已提交
104
  pRpcObj = taosCachePut(tscRpcCache, rpcObj.key, strlen(rpcObj.key), &rpcObj, sizeof(rpcObj), 1000*2);   
Y
yihaoDeng 已提交
105 106 107 108 109 110 111 112
  if (pRpcObj == NULL) {
    rpcClose(rpcObj.pDnodeConn);
    pthread_mutex_unlock(&rpcObjMutex);
    return -1;
  } 

  *ppRpcObj  = pRpcObj;
  pthread_mutex_unlock(&rpcObjMutex);
S
slguan 已提交
113 114 115
  return 0;
}

S
Shengliang Guan 已提交
116
void taos_init_imp(void) {
H
Haojun Liao 已提交
117
  char temp[128]  = {0};
118
  
119
  errno = TSDB_CODE_SUCCESS;
H
hzcheng 已提交
120
  srand(taosGetTimestampSec());
L
lihui 已提交
121
  deltaToUtcInitOnce();
H
hzcheng 已提交
122 123 124 125

  if (tscEmbedded == 0) {

    // Read global configuration.
S
slguan 已提交
126 127
    taosInitGlobalCfg();
    taosReadGlobalLogCfg();
H
hzcheng 已提交
128 129

    // For log directory
130 131 132
    if (mkdir(tsLogDir, 0755) != 0 && errno != EEXIST) {
      printf("failed to create log dir:%s\n", tsLogDir);
    }
H
hzcheng 已提交
133

S
slguan 已提交
134
    sprintf(temp, "%s/taoslog", tsLogDir);
H
hzcheng 已提交
135
    if (taosInitLog(temp, tsNumOfLogLines, 10) < 0) {
S
slguan 已提交
136
      printf("failed to open log file in directory:%s\n", tsLogDir);
H
hzcheng 已提交
137 138
    }

S
slguan 已提交
139 140
    taosReadGlobalCfg();
    taosCheckGlobalCfg();
S
TD-2371  
Shengliang Guan 已提交
141
    taosInitNotes();
H
hzcheng 已提交
142

143
    rpcInit();
144 145
    tscDebug("starting to initialize TAOS client ...");
    tscDebug("Local End Point is:%s", tsLocalEp);
H
hzcheng 已提交
146 147
  }

L
lihui 已提交
148
  taosSetCoreDump();
149
  tscInitMsgsFp();
150
  int queueSize = tsMaxConnections*2;
H
hzcheng 已提交
151

152
  double factor = (tscEmbedded == 0)? 2.0:4.0;
Y
yihaoDeng 已提交
153
  tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
H
Haojun Liao 已提交
154 155 156
  if (tscNumOfThreads < 2) {
    tscNumOfThreads = 2;
  }
Y
patch  
yihaoDeng 已提交
157
  taosTmrThreads = tscNumOfThreads;
H
hzcheng 已提交
158 159

  tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
S
slguan 已提交
160 161 162 163
  if (NULL == tscQhandle) {
    tscError("failed to init scheduler");
    return;
  }
H
hzcheng 已提交
164

165
  tscTmr = taosTmrInit(tsMaxConnections * 2, 200, 60000, "TSC");
H
Hongze Cheng 已提交
166 167
  if(0 == tscEmbedded){
    taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);      
S
slguan 已提交
168
  }
H
hzcheng 已提交
169

170
  if (tscTableMetaInfo == NULL) {
H
Haojun Liao 已提交
171
    tscObjRef  = taosOpenRef(40960, tscFreeRegisteredSqlObj);
172
    tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
173 174
    tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    tscDebug("TableMeta:%p", tscTableMetaInfo);
175
  }
Y
yihaoDeng 已提交
176 177
   
  int refreshTime = 5;
Y
yihaoDeng 已提交
178 179
  tscRpcCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, true, tscFreeRpcObj, "rpcObj");
  pthread_mutex_init(&rpcObjMutex, NULL);
H
hzcheng 已提交
180

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181 182
  tscRefId = taosOpenRef(200, tscCloseTscObj);

H
Haojun Liao 已提交
183 184 185 186
  // in other language APIs, taos_cleanup is not available yet.
  // So, to make sure taos_cleanup will be invoked to clean up the allocated
  // resource to suppress the valgrind warning.
  atexit(taos_cleanup);
187
  tscDebug("client is initialized successfully");
H
hzcheng 已提交
188 189 190 191
}

void taos_init() { pthread_once(&tscinit, taos_init_imp); }

H
Haojun Liao 已提交
192
// this function may be called by user or system, or by both simultaneously.
S
Shengliang Guan 已提交
193
void taos_cleanup(void) {
H
Haojun Liao 已提交
194
  tscDebug("start to cleanup client environment");
H
Haojun Liao 已提交
195

196 197
  if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
    return;
198
  }
H
Haojun Liao 已提交
199

200 201
  taosHashCleanup(tscTableMetaInfo);
  tscTableMetaInfo = NULL;
H
Haojun Liao 已提交
202

203 204 205 206 207 208 209 210 211 212 213 214 215 216
  taosHashCleanup(tscVgroupMap);
  tscVgroupMap = NULL;

  int32_t id = tscObjRef;
  tscObjRef = -1;
  taosCloseRef(id);

  void* p = tscQhandle;
  tscQhandle = NULL;
  taosCleanUpScheduler(p);

  id = tscRefId;
  tscRefId = -1;
  taosCloseRef(id);
H
Haojun Liao 已提交
217 218

  taosCleanupKeywordsTable();
H
Haojun Liao 已提交
219

Y
yihaoDeng 已提交
220 221 222 223 224
  p = tscRpcCache; 
  tscRpcCache = NULL;
  
  if (p != NULL) {
    taosCacheCleanup(p); 
Y
yihaoDeng 已提交
225
    pthread_mutex_destroy(&rpcObjMutex);
H
Haojun Liao 已提交
226
  }
227

Y
TD-2884  
yihaoDeng 已提交
228 229 230 231
  if (tscEmbedded == 0) {
    rpcCleanup();
    taosCloseLog();
  };
H
Haojun Liao 已提交
232

233 234 235
  p = tscTmr;
  tscTmr = NULL;
  taosTmrCleanUp(p);
236 237
}

weixin_48148422's avatar
weixin_48148422 已提交
238
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
S
slguan 已提交
239
  SGlobalCfg *cfg = NULL;
H
hzcheng 已提交
240 241 242

  switch (option) {
    case TSDB_OPTION_CONFIGDIR:
S
slguan 已提交
243
      cfg = taosGetConfigOption("configDir");
H
hjxilinx 已提交
244 245
      assert(cfg != NULL);
    
S
slguan 已提交
246
      if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
B
Bomin Zhang 已提交
247
        tstrncpy(configDir, pStr, TSDB_FILENAME_LEN);
S
slguan 已提交
248
        cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
249
        tscInfo("set config file directory:%s", pStr);
H
hzcheng 已提交
250
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
251 252
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
253 254
      }
      break;
S
slguan 已提交
255

H
hzcheng 已提交
256
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
S
slguan 已提交
257
      cfg = taosGetConfigOption("shellActivityTimer");
H
hjxilinx 已提交
258 259
      assert(cfg != NULL);
    
S
slguan 已提交
260
      if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
weixin_48148422's avatar
weixin_48148422 已提交
261
        tsShellActivityTimer = atoi(pStr);
H
hzcheng 已提交
262 263
        if (tsShellActivityTimer < 1) tsShellActivityTimer = 1;
        if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600;
S
slguan 已提交
264
        cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
265
        tscInfo("set shellActivityTimer:%d", tsShellActivityTimer);
H
hzcheng 已提交
266
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
267
        tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, pStr,
268
                tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
H
hzcheng 已提交
269 270
      }
      break;
S
slguan 已提交
271

H
hzcheng 已提交
272
    case TSDB_OPTION_LOCALE: {  // set locale
S
slguan 已提交
273
      cfg = taosGetConfigOption("locale");
H
hjxilinx 已提交
274 275
      assert(cfg != NULL);
  
H
hzcheng 已提交
276 277
      size_t len = strlen(pStr);
      if (len == 0 || len > TSDB_LOCALE_LEN) {
278
        tscInfo("Invalid locale:%s, use default", pStr);
H
hzcheng 已提交
279 280 281
        return -1;
      }

S
slguan 已提交
282
      if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
H
hzcheng 已提交
283 284
        char sep = '.';

S
slguan 已提交
285 286
        if (strlen(tsLocale) == 0) { // locale does not set yet
          char* defaultLocale = setlocale(LC_CTYPE, "");
S
Shengliang Guan 已提交
287
          tstrncpy(tsLocale, defaultLocale, TSDB_LOCALE_LEN);
S
slguan 已提交
288 289 290
        }

        // set the user specified locale
H
hzcheng 已提交
291 292 293
        char *locale = setlocale(LC_CTYPE, pStr);

        if (locale != NULL) {
294
          tscInfo("locale set, prev locale:%s, new locale:%s", tsLocale, locale);
S
slguan 已提交
295
          cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
S
slguan 已提交
296
        } else { // set the user-specified localed failed, use default LC_CTYPE as current locale
S
slguan 已提交
297
          locale = setlocale(LC_CTYPE, tsLocale);
298
          tscInfo("failed to set locale:%s, current locale:%s", pStr, tsLocale);
H
hzcheng 已提交
299 300
        }

S
Shengliang Guan 已提交
301
        tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
H
hzcheng 已提交
302 303 304 305 306 307 308 309

        char *charset = strrchr(tsLocale, sep);
        if (charset != NULL) {
          charset += 1;

          charset = taosCharsetReplace(charset);

          if (taosValidateEncodec(charset)) {
S
slguan 已提交
310
            if (strlen(tsCharset) == 0) {
311
              tscInfo("charset set:%s", charset);
S
slguan 已提交
312
            } else {
313
              tscInfo("charset changed from %s to %s", tsCharset, charset);
S
slguan 已提交
314 315
            }

S
Shengliang Guan 已提交
316
            tstrncpy(tsCharset, charset, TSDB_LOCALE_LEN);
S
slguan 已提交
317
            cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
S
slguan 已提交
318

H
hzcheng 已提交
319
          } else {
320
            tscInfo("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset);
H
hzcheng 已提交
321
          }
S
slguan 已提交
322

H
hzcheng 已提交
323
          free(charset);
S
slguan 已提交
324
        } else { // it may be windows system
325
          tscInfo("charset remains:%s", tsCharset);
H
hzcheng 已提交
326 327
        }
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
328 329
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
330 331 332 333 334 335
      }
      break;
    }

    case TSDB_OPTION_CHARSET: {
      /* set charset will override the value of charset, assigned during system locale changed */
S
slguan 已提交
336
      cfg = taosGetConfigOption("charset");
H
hjxilinx 已提交
337 338
      assert(cfg != NULL);
      
H
hzcheng 已提交
339 340
      size_t len = strlen(pStr);
      if (len == 0 || len > TSDB_LOCALE_LEN) {
341
        tscInfo("failed to set charset:%s", pStr);
H
hzcheng 已提交
342 343 344
        return -1;
      }

S
slguan 已提交
345
      if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
H
hzcheng 已提交
346
        if (taosValidateEncodec(pStr)) {
S
slguan 已提交
347
          if (strlen(tsCharset) == 0) {
348
            tscInfo("charset is set:%s", pStr);
S
slguan 已提交
349
          } else {
350
            tscInfo("charset changed from %s to %s", tsCharset, pStr);
S
slguan 已提交
351 352
          }

S
Shengliang Guan 已提交
353
          tstrncpy(tsCharset, pStr, TSDB_LOCALE_LEN);
S
slguan 已提交
354
          cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
H
hzcheng 已提交
355
        } else {
356
          tscInfo("charset:%s not valid", pStr);
H
hzcheng 已提交
357 358
        }
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
359 360
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
361 362 363 364 365 366
      }

      break;
    }

    case TSDB_OPTION_TIMEZONE:
S
slguan 已提交
367
      cfg = taosGetConfigOption("timezone");
H
hjxilinx 已提交
368 369
      assert(cfg != NULL);
    
S
slguan 已提交
370
      if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
S
Shengliang Guan 已提交
371
        tstrncpy(tsTimezone, pStr, TSDB_TIMEZONE_LEN);
H
hzcheng 已提交
372
        tsSetTimeZone();
S
slguan 已提交
373
        cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
374
        tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, pStr);
H
hzcheng 已提交
375
      } else {
weixin_48148422's avatar
weixin_48148422 已提交
376 377
        tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, pStr,
                tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
H
hzcheng 已提交
378 379
      }
      break;
380

H
hzcheng 已提交
381
    default:
H
hjxilinx 已提交
382
      // TODO return the correct error code to client in the format for taos_errstr()
H
hzcheng 已提交
383 384 385 386 387 388
      tscError("Invalid option %d", option);
      return -1;
  }

  return 0;
}
weixin_48148422's avatar
weixin_48148422 已提交
389 390 391 392 393 394

int taos_options(TSDB_OPTION option, const void *arg, ...) {
  static int32_t lock = 0;

  for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
    if (i % 1000 == 0) {
395
      tscInfo("haven't acquire lock after spin %d times.", i);
weixin_48148422's avatar
weixin_48148422 已提交
396 397 398 399 400 401 402 403
      sched_yield();
    }
  }

  int ret = taos_options_imp(option, (const char*)arg);

  atomic_store_32(&lock, 0);
  return ret;
L
lihui 已提交
404
}