clientEnv.c 20.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "catalog.h"
H
Haojun Liao 已提交
17 18
#include "clientInt.h"
#include "clientLog.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "functionMgt.h"
#include "os.h"
21
#include "query.h"
X
Xiaoyu Wang 已提交
22
#include "qworker.h"
X
Xiaoyu Wang 已提交
23
#include "scheduler.h"
H
Haojun Liao 已提交
24 25
#include "tcache.h"
#include "tglobal.h"
dengyihao's avatar
dengyihao 已提交
26
#include "tmsg.h"
H
Haojun Liao 已提交
27 28
#include "tref.h"
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
29
#include "tsched.h"
H
Haojun Liao 已提交
30
#include "ttime.h"
D
dapan1121 已提交
31
#include "thttp.h"
H
Haojun Liao 已提交
32 33

#define TSC_VAR_NOT_RELEASE 1
34
#define TSC_VAR_RELEASED    0
H
Haojun Liao 已提交
35

dengyihao's avatar
dengyihao 已提交
36
SAppInfo appInfo;
D
dapan1121 已提交
37
int64_t  lastClusterId = 0;
L
Liu Jicong 已提交
38
int32_t  clientReqRefPool = -1;
dengyihao's avatar
dengyihao 已提交
39
int32_t  clientConnRefPool = -1;
D
dapan1121 已提交
40
int32_t  clientStop = 0;
H
Haojun Liao 已提交
41

dengyihao's avatar
dengyihao 已提交
42 43
int32_t timestampDeltaLimit = 900;  // s

L
Liu Jicong 已提交
44 45
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t    tscInitRes = 0;
H
Haojun Liao 已提交
46

dengyihao's avatar
dengyihao 已提交
47
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
H
Haojun Liao 已提交
48
  // connection has been released already, abort creating request.
H
Haojun Liao 已提交
49
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
H
Haojun Liao 已提交
50 51 52 53

  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);

  if (pTscObj->pAppInfo) {
D
dapan1121 已提交
54
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
H
Haojun Liao 已提交
55

L
Liu Jicong 已提交
56 57
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
dengyihao's avatar
dengyihao 已提交
58 59
    tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
             ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
D
dapan1121 已提交
60
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
H
Haojun Liao 已提交
61
  }
D
dapan1121 已提交
62 63

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
64 65
}

dengyihao's avatar
dengyihao 已提交
66
static void deregisterRequest(SRequestObj *pRequest) {
dengyihao's avatar
dengyihao 已提交
67
  const static int64_t SLOW_QUERY_INTERVAL = 3000000L;  // todo configurable
H
Haojun Liao 已提交
68 69
  assert(pRequest != NULL);

dengyihao's avatar
dengyihao 已提交
70
  STscObj            *pTscObj = pRequest->pTscObj;
D
dapan1121 已提交
71
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
H
Haojun Liao 已提交
72

L
Liu Jicong 已提交
73
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
H
Haojun Liao 已提交
74 75
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);

76
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
dengyihao's avatar
dengyihao 已提交
77 78
  tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
           " elapsed:%.2f ms, "
79 80
           "current:%d, app current:%d",
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
D
dapan1121 已提交
81

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
  if (pRequest->pQuery && pRequest->pQuery->pRoot) {
    if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && (0 == ((SVnodeModifOpStmt*)pRequest->pQuery->pRoot)->sqlNodeType)) {
      tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
              "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
              duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, 
              pRequest->metric.planCostUs, pRequest->metric.execCostUs);
      atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
      tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
              "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
              duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, 
              pRequest->metric.planCostUs, pRequest->metric.execCostUs);

      atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
    }
D
dapan1121 已提交
97
  }
dengyihao's avatar
dengyihao 已提交
98

D
dapan1121 已提交
99 100 101
  if (duration >= SLOW_QUERY_INTERVAL) {
    atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
  }
dengyihao's avatar
dengyihao 已提交
102

D
dapan1121 已提交
103
  releaseTscObj(pTscObj->id);
H
Haojun Liao 已提交
104 105
}

106
// todo close the transporter properly
D
dapan1121 已提交
107 108
void closeTransporter(SAppInstInfo *pAppInfo) {
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
H
Haojun Liao 已提交
109 110 111
    return;
  }

D
dapan1121 已提交
112 113
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
  rpcClose(pAppInfo->pTransporter);
H
Haojun Liao 已提交
114 115
}

dengyihao's avatar
dengyihao 已提交
116
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
D
dapan1121 已提交
117
  if (NEED_REDIRECT_ERROR(code)) {
dengyihao's avatar
dengyihao 已提交
118
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
D
dapan1121 已提交
119
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) {
dengyihao's avatar
dengyihao 已提交
120 121
      return false;
    }
D
dapan1121 已提交
122 123 124 125 126 127
    return true;
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
128 129
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
dengyihao's avatar
dengyihao 已提交
130 131 132
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
133 134 135
  return false;
}

H
Haojun Liao 已提交
136
// TODO refactor
dengyihao's avatar
dengyihao 已提交
137
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
H
Haojun Liao 已提交
138 139 140 141 142 143
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
  rpcInit.label = "TSC";
  rpcInit.numOfThreads = numOfThread;
  rpcInit.cfp = processMsgFromServer;
D
dapan1121 已提交
144
  rpcInit.rfp = clientRpcRfp;
S
Shengliang Guan 已提交
145
  rpcInit.sessions = 1024;
H
Haojun Liao 已提交
146 147
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.user = (char *)user;
S
Shengliang Guan 已提交
148
  rpcInit.idleTime = tsShellActivityTimer * 1000;
dengyihao's avatar
dengyihao 已提交
149
  rpcInit.compressSize = tsCompressMsgSize;
dengyihao's avatar
dengyihao 已提交
150
  rpcInit.dfp = destroyAhandle;
dengyihao's avatar
dengyihao 已提交
151

dengyihao's avatar
dengyihao 已提交
152 153 154 155
  rpcInit.retryMinInterval = tsRedirectPeriod;
  rpcInit.retryStepFactor = tsRedirectFactor;
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
  rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
dengyihao's avatar
dengyihao 已提交
156

dengyihao's avatar
dengyihao 已提交
157
  void *pDnodeConn = rpcOpen(&rpcInit);
H
Haojun Liao 已提交
158 159 160 161 162 163 164 165
  if (pDnodeConn == NULL) {
    tscError("failed to init connection to server");
    return NULL;
  }

  return pDnodeConn;
}

D
dapan1121 已提交
166
void destroyAllRequests(SHashObj *pRequests) {
L
Liu Jicong 已提交
167
  void *pIter = taosHashIterate(pRequests, NULL);
D
dapan1121 已提交
168 169 170
  while (pIter != NULL) {
    int64_t *rid = pIter;

D
dapan1121 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
    SRequestObj *pRequest = acquireRequest(*rid);
    if (pRequest) {
      destroyRequest(pRequest);
      releaseRequest(*rid);
    }

    pIter = taosHashIterate(pRequests, pIter);
  }
}

void stopAllRequests(SHashObj *pRequests) {
  void *pIter = taosHashIterate(pRequests, NULL);
  while (pIter != NULL) {
    int64_t *rid = pIter;

    SRequestObj *pRequest = acquireRequest(*rid);
    if (pRequest) {
      taos_stop_query(pRequest);
      releaseRequest(*rid);
    }
L
Liu Jicong 已提交
191

D
dapan1121 已提交
192 193 194 195
    pIter = taosHashIterate(pRequests, pIter);
  }
}

dengyihao's avatar
dengyihao 已提交
196
void destroyAppInst(SAppInstInfo *pAppInfo) {
D
dapan1121 已提交
197
  tscDebug("destroy app inst mgr %p", pAppInfo);
D
dapan1121 已提交
198 199

  taosThreadMutexLock(&appInfo.mutex);
dengyihao's avatar
dengyihao 已提交
200

D
dapan1121 已提交
201 202
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
  taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
D
dapan1121 已提交
203 204 205

  taosThreadMutexUnlock(&appInfo.mutex);

D
dapan1121 已提交
206 207
  taosMemoryFreeClear(pAppInfo->instKey);
  closeTransporter(pAppInfo);
dengyihao's avatar
dengyihao 已提交
208

D
dapan1121 已提交
209 210
  taosThreadMutexLock(&pAppInfo->qnodeMutex);
  taosArrayDestroy(pAppInfo->pQnodeList);
dengyihao's avatar
dengyihao 已提交
211
  taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
D
dapan1121 已提交
212 213 214 215

  taosMemoryFree(pAppInfo);
}

H
Haojun Liao 已提交
216
void destroyTscObj(void *pObj) {
D
dapan1121 已提交
217 218 219
  if (NULL == pObj) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
220

H
Haojun Liao 已提交
221
  STscObj *pTscObj = pObj;
dengyihao's avatar
dengyihao 已提交
222
  int64_t  tscId = pTscObj->id;
D
dapan1121 已提交
223
  tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
H
Haojun Liao 已提交
224

D
dapan1121 已提交
225
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
226
  hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
227

D
dapan1121 已提交
228
  destroyAllRequests(pTscObj->pRequests);
229
  taosHashCleanup(pTscObj->pRequests);
dengyihao's avatar
dengyihao 已提交
230

D
dapan1121 已提交
231
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
dengyihao's avatar
dengyihao 已提交
232
  tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
D
dapan1121 已提交
233 234
           pTscObj->pAppInfo->numOfConns);

H
Haojun Liao 已提交
235
  // In any cases, we should not free app inst here. Or an race condition rises.
dengyihao's avatar
dengyihao 已提交
236
  /*int64_t connNum = */ atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
237

wafwerar's avatar
wafwerar 已提交
238
  taosThreadMutexDestroy(&pTscObj->mutex);
D
dapan1121 已提交
239
  taosMemoryFree(pTscObj);
D
dapan1121 已提交
240

D
dapan1121 已提交
241
  tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
H
Haojun Liao 已提交
242 243
}

244
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
wafwerar's avatar
wafwerar 已提交
245
  STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
H
Haojun Liao 已提交
246
  if (NULL == pObj) {
S
Shengliang Guan 已提交
247
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
248 249 250
    return NULL;
  }

D
dapan1121 已提交
251 252 253
  pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pObj->pRequests) {
    taosMemoryFree(pObj);
S
Shengliang Guan 已提交
254
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
255 256
    return NULL;
  }
L
Liu Jicong 已提交
257

258
  pObj->connType = connType;
H
Haojun Liao 已提交
259 260 261 262
  pObj->pAppInfo = pAppInfo;
  tstrncpy(pObj->user, user, sizeof(pObj->user));
  memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);

263 264 265 266
  if (db != NULL) {
    tstrncpy(pObj->db, db, tListLen(pObj->db));
  }

wafwerar's avatar
wafwerar 已提交
267
  taosThreadMutexInit(&pObj->mutex, NULL);
D
dapan1121 已提交
268
  pObj->id = taosAddRef(clientConnRefPool, pObj);
269
  pObj->schemalessType = 1;
H
Haojun Liao 已提交
270

D
dapan1121 已提交
271 272
  atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);

D
dapan1121 已提交
273
  tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
H
Haojun Liao 已提交
274 275 276
  return pObj;
}

L
Liu Jicong 已提交
277
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
D
dapan1121 已提交
278

L
Liu Jicong 已提交
279
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
D
dapan1121 已提交
280

dengyihao's avatar
dengyihao 已提交
281
void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
wafwerar's avatar
wafwerar 已提交
282
  SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
H
Haojun Liao 已提交
283
  if (NULL == pRequest) {
S
Shengliang Guan 已提交
284
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
285 286 287
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
288
  STscObj *pTscObj = acquireTscObj(connId);
289
  if (pTscObj == NULL) {
290
    taosMemoryFree(pRequest);
291 292 293 294
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

L
Liu Jicong 已提交
295
  pRequest->resType = RES_TYPE__QUERY;
dengyihao's avatar
dengyihao 已提交
296
  pRequest->requestId = reqid == 0 ? generateRequestId() : reqid;
D
dapan1121 已提交
297
  pRequest->metric.start = taosGetTimestampUs();
H
Haojun Liao 已提交
298

299
  pRequest->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
dengyihao's avatar
dengyihao 已提交
300
  pRequest->type = type;
301
  pRequest->allocatorRefId = -1;
302 303 304 305

  pRequest->pDb = getDbOfConnection(pTscObj);
  pRequest->pTscObj = pTscObj;

wafwerar's avatar
wafwerar 已提交
306
  pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
D
stmt  
dapan1121 已提交
307
  pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
H
Haojun Liao 已提交
308 309
  tsem_init(&pRequest->body.rspSem, 0, 0);

310
  if (registerRequest(pRequest, pTscObj)) {
D
dapan1121 已提交
311 312 313
    doDestroyRequest(pRequest);
    return NULL;
  }
L
Liu Jicong 已提交
314

H
Haojun Liao 已提交
315 316 317
  return pRequest;
}

L
Liu Jicong 已提交
318
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
wafwerar's avatar
wafwerar 已提交
319 320 321 322 323
  taosMemoryFreeClear(pResInfo->pRspMsg);
  taosMemoryFreeClear(pResInfo->length);
  taosMemoryFreeClear(pResInfo->row);
  taosMemoryFreeClear(pResInfo->pCol);
  taosMemoryFreeClear(pResInfo->fields);
324
  taosMemoryFreeClear(pResInfo->userFields);
wmmhello's avatar
wmmhello 已提交
325
  taosMemoryFreeClear(pResInfo->convertJson);
326 327 328

  if (pResInfo->convertBuf != NULL) {
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
wafwerar's avatar
wafwerar 已提交
329
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
330
    }
wafwerar's avatar
wafwerar 已提交
331
    taosMemoryFreeClear(pResInfo->convertBuf);
332
  }
H
Haojun Liao 已提交
333 334
}

D
dapan1121 已提交
335 336 337 338 339 340
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }

int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }

int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }

D
dapan1121 已提交
341
void doDestroyRequest(void *p) {
D
dapan1121 已提交
342 343 344
  if (NULL == p) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
345

dengyihao's avatar
dengyihao 已提交
346
  SRequestObj *pRequest = (SRequestObj *)p;
dengyihao's avatar
dengyihao 已提交
347

D
dapan1121 已提交
348
  uint64_t reqId = pRequest->requestId;
D
dapan1121 已提交
349
  tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
dengyihao's avatar
dengyihao 已提交
350

D
dapan1121 已提交
351
  taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
L
Liu Jicong 已提交
352

D
dapan1121 已提交
353
  schedulerFreeJob(&pRequest->body.queryJob, 0);
D
dapan1121 已提交
354

wafwerar's avatar
wafwerar 已提交
355 356
  taosMemoryFreeClear(pRequest->msgBuf);
  taosMemoryFreeClear(pRequest->pDb);
H
Haojun Liao 已提交
357

H
Haojun Liao 已提交
358
  doFreeReqResultInfo(&pRequest->body.resInfo);
359
  tsem_destroy(&pRequest->body.rspSem);
H
Haojun Liao 已提交
360

X
Xiaoyu Wang 已提交
361 362
  taosArrayDestroy(pRequest->tableList);
  taosArrayDestroy(pRequest->dbList);
D
dapan1121 已提交
363
  taosArrayDestroy(pRequest->targetTableList);
X
Xiaoyu Wang 已提交
364

D
dapan1121 已提交
365
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
D
dapan1121 已提交
366

D
dapan1121 已提交
367 368 369
  if (pRequest->self) {
    deregisterRequest(pRequest);
  }
D
dapan1121 已提交
370

H
Haojun Liao 已提交
371
  if (pRequest->syncQuery) {
372 373 374
      if (pRequest->body.param){
        tsem_destroy(&((SSyncQueryParam*)pRequest->body.param)->sem);
      }
H
Haojun Liao 已提交
375 376 377
    taosMemoryFree(pRequest->body.param);
  }

378 379 380
  qDestroyQuery(pRequest->pQuery);
  nodesDestroyAllocator(pRequest->allocatorRefId);

D
dapan1121 已提交
381
  taosMemoryFreeClear(pRequest->sqlstr);
382
  taosMemoryFree(pRequest);
D
dapan1121 已提交
383
  tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
H
Haojun Liao 已提交
384 385
}

dengyihao's avatar
dengyihao 已提交
386
void destroyRequest(SRequestObj *pRequest) {
H
Haojun Liao 已提交
387 388 389 390
  if (pRequest == NULL) {
    return;
  }

D
dapan1121 已提交
391
  taos_stop_query(pRequest);
D
dapan1121 已提交
392
  removeRequest(pRequest->self);
H
Haojun Liao 已提交
393 394
}

D
dapan 已提交
395 396
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }

D
dapan1121 已提交
397 398 399 400 401 402 403 404 405 406 407
static void *tscCrashReportThreadFp(void *param) {
  setThreadName("client-crashReport");
  char filepath[PATH_MAX] = {0};
  snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
  char *pMsg = NULL;
  int64_t msgLen = 0;
  TdFilePtr pFile = NULL;
  bool truncateFile = false;
  int32_t sleepTime = 200;
  int32_t reportPeriodNum = 3600 * 1000 / sleepTime;
  int32_t loopTimes = reportPeriodNum;
D
dapan 已提交
408 409 410 411 412 413

#ifdef WINDOWS
  if (taosCheckCurrentInDll()) {
    atexit(crashReportThreadFuncUnexpectedStopped);
  }
#endif
D
dapan1121 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
  
  while (1) {
    if (clientStop) break;
    if (loopTimes++ < reportPeriodNum) {
      taosMsleep(sleepTime);
      continue;
    }

    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
    if (pMsg && msgLen > 0) {
      if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
        tscError("failed to send crash report");
        if (pFile) {
          taosReleaseCrashLogFile(pFile, false);
          continue;
        }
      } else {
        tscInfo("succeed to send crash report");
        truncateFile = true;
      }
    } else {
      tscDebug("no crash info");
    }

    taosMemoryFree(pMsg);

    if (pMsg && msgLen > 0) {
      pMsg = NULL;
      continue;
    }
    
    if (pFile) {
      taosReleaseCrashLogFile(pFile, truncateFile);
      truncateFile = false;
    }
    
    taosMsleep(sleepTime);
    loopTimes = 0;
  }

  clientStop = -1;
  return NULL;
}

int32_t tscCrashReportInit() {
  if (!tsEnableCrashReport) {
    return 0;
  }
  
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
  TdThread            crashReportThread;
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
    tscError("failed to create crashReport thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

void tscStopCrashReport() {
  if (!tsEnableCrashReport) {
    return;
  }

D
dapan 已提交
481 482 483 484 485
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
    tscDebug("hb thread already stopped");
    return;
  }

D
dapan1121 已提交
486 487 488 489 490
  while (atomic_load_32(&clientStop) > 0) {
    taosMsleep(100);
  }
}

D
dapan1121 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507

void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
  char *pMsg = NULL;
  const char *flags = "UTL FATAL ";
  ELogLevel   level = DEBUG_FATAL;
  int32_t     dflag = 255;
  int64_t     msgLen= -1;

  if (tsEnableCrashReport) {
    if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
      taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
    } else {
      msgLen = strlen(pMsg);  
    }
  }

  taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
D
dapan1121 已提交
508 509
}

D
dapan1121 已提交
510

H
Haojun Liao 已提交
511 512 513 514 515
void taos_init_imp(void) {
  // In the APIs of other program language, taos_cleanup is not available yet.
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
  atexit(taos_cleanup);
  errno = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
516
  taosSeedRand(taosGetTimestampSec());
H
Haojun Liao 已提交
517

D
dapan1121 已提交
518 519 520 521
  appInfo.pid = taosGetPId();
  appInfo.startTime = taosGetTimestampMs();
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

H
Haojun Liao 已提交
522
  deltaToUtcInitOnce();
S
Shengliang Guan 已提交
523

wafwerar's avatar
wafwerar 已提交
524
  if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
A
Alex Duan 已提交
525 526
    // ignore create log failed, only print
    printf(" WARING: Create taoslog failed. configDir=%s\n", configDir);
S
Shengliang Guan 已提交
527 528
  }

wafwerar's avatar
wafwerar 已提交
529
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
S
Shengliang Guan 已提交
530 531 532 533
    tscInitRes = -1;
    return;
  }

534
  initQueryModuleMsgHandle();
H
Haojun Liao 已提交
535

S
Shengliang Guan 已提交
536 537 538
  if (taosConvInit() != 0) {
    ASSERTS(0, "failed to init conv");
  }
dengyihao's avatar
dengyihao 已提交
539

S
Shengliang Guan 已提交
540
  rpcInit();
H
Haojun Liao 已提交
541

D
dapan1121 已提交
542
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
543 544
  catalogInit(&cfg);

D
dapan1121 已提交
545
  schedulerInit();
S
Shengliang Guan 已提交
546
  tscDebug("starting to initialize TAOS driver");
H
Haojun Liao 已提交
547

wafwerar's avatar
wafwerar 已提交
548
#ifndef WINDOWS
H
Haojun Liao 已提交
549
  taosSetCoreDump(true);
wafwerar's avatar
wafwerar 已提交
550
#endif
H
Haojun Liao 已提交
551 552

  initTaskQueue();
553
  fmFuncMgtInit();
554
  nodesInitAllocatorSet();
H
Haojun Liao 已提交
555

556
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
dengyihao's avatar
dengyihao 已提交
557
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
H
Haojun Liao 已提交
558

dengyihao's avatar
dengyihao 已提交
559
  // transDestroyBuffer(&conn->readBuf);
H
Haojun Liao 已提交
560
  taosGetAppName(appInfo.appName, NULL);
wafwerar's avatar
wafwerar 已提交
561
  taosThreadMutexInit(&appInfo.mutex, NULL);
H
Haojun Liao 已提交
562

D
dapan1121 已提交
563 564
  tscCrashReportInit();
  
H
Haojun Liao 已提交
565 566 567 568
  tscDebug("client is initialized successfully");
}

int taos_init() {
wafwerar's avatar
wafwerar 已提交
569
  taosThreadOnce(&tscinit, taos_init_imp);
H
Haojun Liao 已提交
570 571 572 573
  return tscInitRes;
}

int taos_options_imp(TSDB_OPTION option, const char *str) {
H
Haojun Liao 已提交
574
  if (option == TSDB_OPTION_CONFIGDIR) {
S
Shengliang Guan 已提交
575 576 577
    tstrncpy(configDir, str, PATH_MAX);
    tscInfo("set cfg:%s to %s", configDir, str);
    return 0;
H
Haojun Liao 已提交
578 579
  } else {
    taos_init();  // initialize global config
S
Shengliang Guan 已提交
580 581
  }

dengyihao's avatar
dengyihao 已提交
582
  SConfig     *pCfg = taosGetCfg();
S
Shengliang Guan 已提交
583
  SConfigItem *pItem = NULL;
H
Haojun Liao 已提交
584 585 586

  switch (option) {
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
S
Shengliang Guan 已提交
587
      pItem = cfgGetItem(pCfg, "shellActivityTimer");
H
Haojun Liao 已提交
588
      break;
S
Shengliang Guan 已提交
589 590
    case TSDB_OPTION_LOCALE:
      pItem = cfgGetItem(pCfg, "locale");
H
Haojun Liao 已提交
591
      break;
S
Shengliang Guan 已提交
592 593
    case TSDB_OPTION_CHARSET:
      pItem = cfgGetItem(pCfg, "charset");
H
Haojun Liao 已提交
594 595
      break;
    case TSDB_OPTION_TIMEZONE:
S
Shengliang Guan 已提交
596
      pItem = cfgGetItem(pCfg, "timezone");
H
Haojun Liao 已提交
597
      break;
598 599 600
    case TSDB_OPTION_USE_ADAPTER:
      pItem = cfgGetItem(pCfg, "useAdapter");
      break;
H
Haojun Liao 已提交
601
    default:
S
Shengliang Guan 已提交
602 603 604 605 606 607 608 609 610 611 612 613 614
      break;
  }

  if (pItem == NULL) {
    tscError("Invalid option %d", option);
    return -1;
  }

  int code = cfgSetItem(pCfg, pItem->name, str, CFG_STYPE_TAOS_OPTIONS);
  if (code != 0) {
    tscError("failed to set cfg:%s to %s since %s", pItem->name, str, terrstr());
  } else {
    tscInfo("set cfg:%s to %s", pItem->name, str);
H
Haojun Liao 已提交
615
  }
S
Shengliang Guan 已提交
616 617

  return code;
H
Haojun Liao 已提交
618 619
}

620 621 622 623 624
/**
 * The request id is an unsigned integer format of 64bit.
 *+------------+-----+-----------+---------------+
 *| uid|localIp| PId | timestamp | serial number |
 *+------------+-----+-----------+---------------+
625
 *| 12bit      |12bit|24bit      |16bit          |
626 627 628 629
 *+------------+-----+-----------+---------------+
 * @return
 */
uint64_t generateRequestId() {
H
Haojun Liao 已提交
630
  static uint64_t hashId = 0;
dengyihao's avatar
dengyihao 已提交
631
  static uint32_t requestSerialId = 0;
H
Haojun Liao 已提交
632 633 634 635 636 637 638 639 640 641 642

  if (hashId == 0) {
    char    uid[64] = {0};
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
               tstrerror(TAOS_SYSTEM_ERROR(errno)));

    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
643 644
  }

D
dapan1121 已提交
645
  uint64_t id = 0;
L
Liu Jicong 已提交
646

D
dapan1121 已提交
647 648 649
  while (true) {
    int64_t  ts = taosGetTimestampMs();
    uint64_t pid = taosGetPId();
dengyihao's avatar
dengyihao 已提交
650 651
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
652

D
dapan1121 已提交
653 654 655 656 657
    id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
    if (id) {
      break;
    }
  }
658 659 660
  return id;
}

H
Haojun Liao 已提交
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
#if 0
#include "cJSON.h"
static setConfRet taos_set_config_imp(const char *config){
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
  static bool setConfFlag = false;
  if (setConfFlag) {
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
    strcpy(ret.retMsg, "configuration can only set once");
    return ret;
  }
  taosInitGlobalCfg();
  cJSON *root = cJSON_Parse(config);
  if (root == NULL){
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
    strcpy(ret.retMsg, "parse json error");
    return ret;
  }

  int size = cJSON_GetArraySize(root);
  if(!cJSON_IsObject(root) || size == 0) {
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
    strcpy(ret.retMsg, "json content is invalid, must be not empty object");
    return ret;
  }

  if(size >= 1000) {
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
    strcpy(ret.retMsg, "json object size is too long");
    return ret;
  }

  for(int i = 0; i < size; i++){
    cJSON *item = cJSON_GetArrayItem(root, i);
    if(!item) {
      ret.retCode = SET_CONF_RET_ERR_INNER;
      strcpy(ret.retMsg, "inner error");
      return ret;
    }
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
      ret.retCode = SET_CONF_RET_ERR_PART;
      if (strlen(ret.retMsg) == 0){
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
      }else{
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
        size_t leftSize = tmp >= 0 ? tmp : 0;
        strncat(ret.retMsg, "|",  leftSize);
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
        leftSize = tmp >= 0 ? tmp : 0;
        strncat(ret.retMsg, item->string, leftSize);
      }
    }
  }
  cJSON_Delete(root);
  setConfFlag = true;
  return ret;
}

setConfRet taos_set_config(const char *config){
wafwerar's avatar
wafwerar 已提交
719
  taosThreadMutexLock(&setConfMutex);
H
Haojun Liao 已提交
720
  setConfRet ret = taos_set_config_imp(config);
wafwerar's avatar
wafwerar 已提交
721
  taosThreadMutexUnlock(&setConfMutex);
H
Haojun Liao 已提交
722 723
  return ret;
}
724
#endif