clientEnv.c 21.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "catalog.h"
H
Haojun Liao 已提交
17 18
#include "clientInt.h"
#include "clientLog.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "functionMgt.h"
#include "os.h"
21
#include "query.h"
X
Xiaoyu Wang 已提交
22
#include "qworker.h"
X
Xiaoyu Wang 已提交
23
#include "scheduler.h"
H
Haojun Liao 已提交
24 25
#include "tcache.h"
#include "tglobal.h"
dengyihao's avatar
dengyihao 已提交
26
#include "thttp.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28 29
#include "tref.h"
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
30
#include "tsched.h"
H
Haojun Liao 已提交
31 32 33
#include "ttime.h"

#define TSC_VAR_NOT_RELEASE 1
34
#define TSC_VAR_RELEASED    0
H
Haojun Liao 已提交
35

dengyihao's avatar
dengyihao 已提交
36
SAppInfo appInfo;
D
dapan1121 已提交
37
int64_t  lastClusterId = 0;
L
Liu Jicong 已提交
38
int32_t  clientReqRefPool = -1;
dengyihao's avatar
dengyihao 已提交
39
int32_t  clientConnRefPool = -1;
D
dapan1121 已提交
40
int32_t  clientStop = 0;
H
Haojun Liao 已提交
41

dengyihao's avatar
dengyihao 已提交
42 43
int32_t timestampDeltaLimit = 900;  // s

L
Liu Jicong 已提交
44 45
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t    tscInitRes = 0;
H
Haojun Liao 已提交
46

dengyihao's avatar
dengyihao 已提交
47
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
H
Haojun Liao 已提交
48
  // connection has been released already, abort creating request.
H
Haojun Liao 已提交
49
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
H
Haojun Liao 已提交
50 51 52 53

  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);

  if (pTscObj->pAppInfo) {
D
dapan1121 已提交
54
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
H
Haojun Liao 已提交
55

L
Liu Jicong 已提交
56 57
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
dengyihao's avatar
dengyihao 已提交
58 59
    tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
             ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
D
dapan1121 已提交
60
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
H
Haojun Liao 已提交
61
  }
D
dapan1121 已提交
62 63

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
64 65
}

dengyihao's avatar
dengyihao 已提交
66
static void deregisterRequest(SRequestObj *pRequest) {
dengyihao's avatar
dengyihao 已提交
67
  const static int64_t SLOW_QUERY_INTERVAL = 3000000L;  // todo configurable
H
Haojun Liao 已提交
68 69
  assert(pRequest != NULL);

dengyihao's avatar
dengyihao 已提交
70
  STscObj            *pTscObj = pRequest->pTscObj;
D
dapan1121 已提交
71
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
H
Haojun Liao 已提交
72

L
Liu Jicong 已提交
73
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
H
Haojun Liao 已提交
74 75
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);

76
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
dengyihao's avatar
dengyihao 已提交
77 78
  tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
           " elapsed:%.2f ms, "
79 80
           "current:%d, app current:%d",
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
D
dapan1121 已提交
81

82
  if (pRequest->pQuery && pRequest->pQuery->pRoot) {
dengyihao's avatar
dengyihao 已提交
83 84
    if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type &&
        (0 == ((SVnodeModifOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
85
      tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
dengyihao's avatar
dengyihao 已提交
86 87 88
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
89 90 91
      atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
      tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
dengyihao's avatar
dengyihao 已提交
92 93 94
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
95 96 97

      atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
    }
D
dapan1121 已提交
98
  }
dengyihao's avatar
dengyihao 已提交
99

D
dapan1121 已提交
100 101 102
  if (duration >= SLOW_QUERY_INTERVAL) {
    atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
  }
dengyihao's avatar
dengyihao 已提交
103

D
dapan1121 已提交
104
  releaseTscObj(pTscObj->id);
H
Haojun Liao 已提交
105 106
}

107
// todo close the transporter properly
D
dapan1121 已提交
108 109
void closeTransporter(SAppInstInfo *pAppInfo) {
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
H
Haojun Liao 已提交
110 111 112
    return;
  }

D
dapan1121 已提交
113 114
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
  rpcClose(pAppInfo->pTransporter);
H
Haojun Liao 已提交
115 116
}

dengyihao's avatar
dengyihao 已提交
117
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
D
dapan1121 已提交
118
  if (NEED_REDIRECT_ERROR(code)) {
dengyihao's avatar
dengyihao 已提交
119
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
D
dapan1121 已提交
120
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) {
dengyihao's avatar
dengyihao 已提交
121 122
      return false;
    }
D
dapan1121 已提交
123 124 125 126 127 128
    return true;
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
129 130
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
dengyihao's avatar
dengyihao 已提交
131 132 133
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
134 135 136
  return false;
}

H
Haojun Liao 已提交
137
// TODO refactor
dengyihao's avatar
dengyihao 已提交
138
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
H
Haojun Liao 已提交
139 140 141 142 143 144
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
  rpcInit.label = "TSC";
  rpcInit.numOfThreads = numOfThread;
  rpcInit.cfp = processMsgFromServer;
D
dapan1121 已提交
145
  rpcInit.rfp = clientRpcRfp;
S
Shengliang Guan 已提交
146
  rpcInit.sessions = 1024;
H
Haojun Liao 已提交
147 148
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.user = (char *)user;
S
Shengliang Guan 已提交
149
  rpcInit.idleTime = tsShellActivityTimer * 1000;
dengyihao's avatar
dengyihao 已提交
150
  rpcInit.compressSize = tsCompressMsgSize;
dengyihao's avatar
dengyihao 已提交
151
  rpcInit.dfp = destroyAhandle;
dengyihao's avatar
dengyihao 已提交
152

dengyihao's avatar
dengyihao 已提交
153 154 155 156
  rpcInit.retryMinInterval = tsRedirectPeriod;
  rpcInit.retryStepFactor = tsRedirectFactor;
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
  rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
dengyihao's avatar
dengyihao 已提交
157

dengyihao's avatar
dengyihao 已提交
158 159 160 161
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
  connLimitNum = TMAX(connLimitNum, 10);
  connLimitNum = TMIN(connLimitNum, 500);
  rpcInit.connLimitNum = connLimitNum;
dengyihao's avatar
dengyihao 已提交
162
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
dengyihao's avatar
dengyihao 已提交
163

dengyihao's avatar
dengyihao 已提交
164
  void *pDnodeConn = rpcOpen(&rpcInit);
H
Haojun Liao 已提交
165 166 167 168 169 170 171 172
  if (pDnodeConn == NULL) {
    tscError("failed to init connection to server");
    return NULL;
  }

  return pDnodeConn;
}

D
dapan1121 已提交
173
void destroyAllRequests(SHashObj *pRequests) {
L
Liu Jicong 已提交
174
  void *pIter = taosHashIterate(pRequests, NULL);
D
dapan1121 已提交
175 176 177
  while (pIter != NULL) {
    int64_t *rid = pIter;

D
dapan1121 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    SRequestObj *pRequest = acquireRequest(*rid);
    if (pRequest) {
      destroyRequest(pRequest);
      releaseRequest(*rid);
    }

    pIter = taosHashIterate(pRequests, pIter);
  }
}

void stopAllRequests(SHashObj *pRequests) {
  void *pIter = taosHashIterate(pRequests, NULL);
  while (pIter != NULL) {
    int64_t *rid = pIter;

    SRequestObj *pRequest = acquireRequest(*rid);
    if (pRequest) {
      taos_stop_query(pRequest);
      releaseRequest(*rid);
    }
L
Liu Jicong 已提交
198

D
dapan1121 已提交
199 200 201 202
    pIter = taosHashIterate(pRequests, pIter);
  }
}

dengyihao's avatar
dengyihao 已提交
203
void destroyAppInst(SAppInstInfo *pAppInfo) {
D
dapan1121 已提交
204
  tscDebug("destroy app inst mgr %p", pAppInfo);
D
dapan1121 已提交
205 206

  taosThreadMutexLock(&appInfo.mutex);
dengyihao's avatar
dengyihao 已提交
207

D
dapan1121 已提交
208 209
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
  taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
D
dapan1121 已提交
210 211 212

  taosThreadMutexUnlock(&appInfo.mutex);

D
dapan1121 已提交
213 214
  taosMemoryFreeClear(pAppInfo->instKey);
  closeTransporter(pAppInfo);
dengyihao's avatar
dengyihao 已提交
215

D
dapan1121 已提交
216 217
  taosThreadMutexLock(&pAppInfo->qnodeMutex);
  taosArrayDestroy(pAppInfo->pQnodeList);
dengyihao's avatar
dengyihao 已提交
218
  taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
D
dapan1121 已提交
219 220 221 222

  taosMemoryFree(pAppInfo);
}

H
Haojun Liao 已提交
223
void destroyTscObj(void *pObj) {
D
dapan1121 已提交
224 225 226
  if (NULL == pObj) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
227

H
Haojun Liao 已提交
228
  STscObj *pTscObj = pObj;
dengyihao's avatar
dengyihao 已提交
229
  int64_t  tscId = pTscObj->id;
D
dapan1121 已提交
230
  tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
H
Haojun Liao 已提交
231

D
dapan1121 已提交
232
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
233
  hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
234

D
dapan1121 已提交
235
  destroyAllRequests(pTscObj->pRequests);
236
  taosHashCleanup(pTscObj->pRequests);
dengyihao's avatar
dengyihao 已提交
237

D
dapan1121 已提交
238
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
dengyihao's avatar
dengyihao 已提交
239
  tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
D
dapan1121 已提交
240 241
           pTscObj->pAppInfo->numOfConns);

H
Haojun Liao 已提交
242
  // In any cases, we should not free app inst here. Or an race condition rises.
dengyihao's avatar
dengyihao 已提交
243
  /*int64_t connNum = */ atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
244

wafwerar's avatar
wafwerar 已提交
245
  taosThreadMutexDestroy(&pTscObj->mutex);
D
dapan1121 已提交
246
  taosMemoryFree(pTscObj);
D
dapan1121 已提交
247

D
dapan1121 已提交
248
  tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
H
Haojun Liao 已提交
249 250
}

251
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
wafwerar's avatar
wafwerar 已提交
252
  STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
H
Haojun Liao 已提交
253
  if (NULL == pObj) {
S
Shengliang Guan 已提交
254
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
255 256 257
    return NULL;
  }

D
dapan1121 已提交
258 259 260
  pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pObj->pRequests) {
    taosMemoryFree(pObj);
S
Shengliang Guan 已提交
261
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
262 263
    return NULL;
  }
L
Liu Jicong 已提交
264

265
  pObj->connType = connType;
H
Haojun Liao 已提交
266 267 268 269
  pObj->pAppInfo = pAppInfo;
  tstrncpy(pObj->user, user, sizeof(pObj->user));
  memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);

270 271 272 273
  if (db != NULL) {
    tstrncpy(pObj->db, db, tListLen(pObj->db));
  }

wafwerar's avatar
wafwerar 已提交
274
  taosThreadMutexInit(&pObj->mutex, NULL);
D
dapan1121 已提交
275
  pObj->id = taosAddRef(clientConnRefPool, pObj);
276
  pObj->schemalessType = 1;
H
Haojun Liao 已提交
277

D
dapan1121 已提交
278 279
  atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);

D
dapan1121 已提交
280
  tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
H
Haojun Liao 已提交
281 282 283
  return pObj;
}

L
Liu Jicong 已提交
284
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
D
dapan1121 已提交
285

L
Liu Jicong 已提交
286
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
D
dapan1121 已提交
287

dengyihao's avatar
dengyihao 已提交
288
void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
wafwerar's avatar
wafwerar 已提交
289
  SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
H
Haojun Liao 已提交
290
  if (NULL == pRequest) {
S
Shengliang Guan 已提交
291
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
292 293 294
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
295
  STscObj *pTscObj = acquireTscObj(connId);
296
  if (pTscObj == NULL) {
297
    taosMemoryFree(pRequest);
298 299 300 301
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

L
Liu Jicong 已提交
302
  pRequest->resType = RES_TYPE__QUERY;
dengyihao's avatar
dengyihao 已提交
303
  pRequest->requestId = reqid == 0 ? generateRequestId() : reqid;
D
dapan1121 已提交
304
  pRequest->metric.start = taosGetTimestampUs();
H
Haojun Liao 已提交
305

306
  pRequest->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
dengyihao's avatar
dengyihao 已提交
307
  pRequest->type = type;
308
  pRequest->allocatorRefId = -1;
309 310 311 312

  pRequest->pDb = getDbOfConnection(pTscObj);
  pRequest->pTscObj = pTscObj;

wafwerar's avatar
wafwerar 已提交
313
  pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
D
stmt  
dapan1121 已提交
314
  pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
H
Haojun Liao 已提交
315 316
  tsem_init(&pRequest->body.rspSem, 0, 0);

317
  if (registerRequest(pRequest, pTscObj)) {
D
dapan1121 已提交
318 319 320
    doDestroyRequest(pRequest);
    return NULL;
  }
L
Liu Jicong 已提交
321

H
Haojun Liao 已提交
322 323 324
  return pRequest;
}

L
Liu Jicong 已提交
325
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
wafwerar's avatar
wafwerar 已提交
326 327 328 329 330
  taosMemoryFreeClear(pResInfo->pRspMsg);
  taosMemoryFreeClear(pResInfo->length);
  taosMemoryFreeClear(pResInfo->row);
  taosMemoryFreeClear(pResInfo->pCol);
  taosMemoryFreeClear(pResInfo->fields);
331
  taosMemoryFreeClear(pResInfo->userFields);
wmmhello's avatar
wmmhello 已提交
332
  taosMemoryFreeClear(pResInfo->convertJson);
333 334 335

  if (pResInfo->convertBuf != NULL) {
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
wafwerar's avatar
wafwerar 已提交
336
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
337
    }
wafwerar's avatar
wafwerar 已提交
338
    taosMemoryFreeClear(pResInfo->convertBuf);
339
  }
H
Haojun Liao 已提交
340 341
}

D
dapan1121 已提交
342 343 344 345 346 347
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }

int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }

int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }

D
dapan1121 已提交
348
void doDestroyRequest(void *p) {
D
dapan1121 已提交
349 350 351
  if (NULL == p) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
352

dengyihao's avatar
dengyihao 已提交
353
  SRequestObj *pRequest = (SRequestObj *)p;
dengyihao's avatar
dengyihao 已提交
354

D
dapan1121 已提交
355
  uint64_t reqId = pRequest->requestId;
D
dapan1121 已提交
356
  tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
dengyihao's avatar
dengyihao 已提交
357

D
dapan1121 已提交
358
  taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
L
Liu Jicong 已提交
359

D
dapan1121 已提交
360
  schedulerFreeJob(&pRequest->body.queryJob, 0);
D
dapan1121 已提交
361

wafwerar's avatar
wafwerar 已提交
362 363
  taosMemoryFreeClear(pRequest->msgBuf);
  taosMemoryFreeClear(pRequest->pDb);
H
Haojun Liao 已提交
364

H
Haojun Liao 已提交
365
  doFreeReqResultInfo(&pRequest->body.resInfo);
366
  tsem_destroy(&pRequest->body.rspSem);
H
Haojun Liao 已提交
367

X
Xiaoyu Wang 已提交
368 369
  taosArrayDestroy(pRequest->tableList);
  taosArrayDestroy(pRequest->dbList);
D
dapan1121 已提交
370
  taosArrayDestroy(pRequest->targetTableList);
X
Xiaoyu Wang 已提交
371

D
dapan1121 已提交
372
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
D
dapan1121 已提交
373

D
dapan1121 已提交
374 375 376
  if (pRequest->self) {
    deregisterRequest(pRequest);
  }
D
dapan1121 已提交
377

H
Haojun Liao 已提交
378
  if (pRequest->syncQuery) {
dengyihao's avatar
dengyihao 已提交
379 380 381
    if (pRequest->body.param) {
      tsem_destroy(&((SSyncQueryParam *)pRequest->body.param)->sem);
    }
H
Haojun Liao 已提交
382 383 384
    taosMemoryFree(pRequest->body.param);
  }

385 386 387
  qDestroyQuery(pRequest->pQuery);
  nodesDestroyAllocator(pRequest->allocatorRefId);

D
dapan1121 已提交
388
  taosMemoryFreeClear(pRequest->sqlstr);
389
  taosMemoryFree(pRequest);
D
dapan1121 已提交
390
  tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
H
Haojun Liao 已提交
391 392
}

dengyihao's avatar
dengyihao 已提交
393
void destroyRequest(SRequestObj *pRequest) {
H
Haojun Liao 已提交
394 395 396 397
  if (pRequest == NULL) {
    return;
  }

D
dapan1121 已提交
398
  taos_stop_query(pRequest);
D
dapan1121 已提交
399
  removeRequest(pRequest->self);
H
Haojun Liao 已提交
400 401
}

D
dapan 已提交
402 403
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }

D
dapan1121 已提交
404 405 406 407
static void *tscCrashReportThreadFp(void *param) {
  setThreadName("client-crashReport");
  char filepath[PATH_MAX] = {0};
  snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
dengyihao's avatar
dengyihao 已提交
408 409
  char     *pMsg = NULL;
  int64_t   msgLen = 0;
D
dapan1121 已提交
410
  TdFilePtr pFile = NULL;
dengyihao's avatar
dengyihao 已提交
411 412 413 414
  bool      truncateFile = false;
  int32_t   sleepTime = 200;
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
  int32_t   loopTimes = reportPeriodNum;
D
dapan 已提交
415 416 417 418 419 420

#ifdef WINDOWS
  if (taosCheckCurrentInDll()) {
    atexit(crashReportThreadFuncUnexpectedStopped);
  }
#endif
dengyihao's avatar
dengyihao 已提交
421

D
dapan1121 已提交
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
  while (1) {
    if (clientStop) break;
    if (loopTimes++ < reportPeriodNum) {
      taosMsleep(sleepTime);
      continue;
    }

    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
    if (pMsg && msgLen > 0) {
      if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
        tscError("failed to send crash report");
        if (pFile) {
          taosReleaseCrashLogFile(pFile, false);
          continue;
        }
      } else {
        tscInfo("succeed to send crash report");
        truncateFile = true;
      }
    } else {
      tscDebug("no crash info");
    }

    taosMemoryFree(pMsg);

    if (pMsg && msgLen > 0) {
      pMsg = NULL;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
451

D
dapan1121 已提交
452 453 454 455
    if (pFile) {
      taosReleaseCrashLogFile(pFile, truncateFile);
      truncateFile = false;
    }
dengyihao's avatar
dengyihao 已提交
456

D
dapan1121 已提交
457 458 459 460 461 462 463 464 465 466 467 468
    taosMsleep(sleepTime);
    loopTimes = 0;
  }

  clientStop = -1;
  return NULL;
}

int32_t tscCrashReportInit() {
  if (!tsEnableCrashReport) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
469

D
dapan1121 已提交
470 471 472
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
dengyihao's avatar
dengyihao 已提交
473
  TdThread crashReportThread;
D
dapan1121 已提交
474 475 476 477 478 479 480 481 482 483 484 485 486 487
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
    tscError("failed to create crashReport thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

void tscStopCrashReport() {
  if (!tsEnableCrashReport) {
    return;
  }

D
dapan 已提交
488 489 490 491 492
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
    tscDebug("hb thread already stopped");
    return;
  }

D
dapan1121 已提交
493 494 495 496 497
  while (atomic_load_32(&clientStop) > 0) {
    taosMsleep(100);
  }
}

D
dapan1121 已提交
498
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
dengyihao's avatar
dengyihao 已提交
499
  char       *pMsg = NULL;
D
dapan1121 已提交
500 501 502
  const char *flags = "UTL FATAL ";
  ELogLevel   level = DEBUG_FATAL;
  int32_t     dflag = 255;
dengyihao's avatar
dengyihao 已提交
503
  int64_t     msgLen = -1;
D
dapan1121 已提交
504 505 506 507 508

  if (tsEnableCrashReport) {
    if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
      taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
    } else {
dengyihao's avatar
dengyihao 已提交
509
      msgLen = strlen(pMsg);
D
dapan1121 已提交
510 511 512 513
    }
  }

  taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
D
dapan1121 已提交
514 515
}

H
Haojun Liao 已提交
516 517 518 519 520
void taos_init_imp(void) {
  // In the APIs of other program language, taos_cleanup is not available yet.
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
  atexit(taos_cleanup);
  errno = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
521
  taosSeedRand(taosGetTimestampSec());
H
Haojun Liao 已提交
522

D
dapan1121 已提交
523 524 525 526
  appInfo.pid = taosGetPId();
  appInfo.startTime = taosGetTimestampMs();
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

H
Haojun Liao 已提交
527
  deltaToUtcInitOnce();
S
Shengliang Guan 已提交
528

wafwerar's avatar
wafwerar 已提交
529
  if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
A
Alex Duan 已提交
530 531
    // ignore create log failed, only print
    printf(" WARING: Create taoslog failed. configDir=%s\n", configDir);
S
Shengliang Guan 已提交
532 533
  }

wafwerar's avatar
wafwerar 已提交
534
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
S
Shengliang Guan 已提交
535 536 537 538
    tscInitRes = -1;
    return;
  }

539
  initQueryModuleMsgHandle();
H
Haojun Liao 已提交
540

S
Shengliang Guan 已提交
541 542 543
  if (taosConvInit() != 0) {
    ASSERTS(0, "failed to init conv");
  }
dengyihao's avatar
dengyihao 已提交
544

S
Shengliang Guan 已提交
545
  rpcInit();
H
Haojun Liao 已提交
546

D
dapan1121 已提交
547
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
548 549
  catalogInit(&cfg);

D
dapan1121 已提交
550
  schedulerInit();
S
Shengliang Guan 已提交
551
  tscDebug("starting to initialize TAOS driver");
H
Haojun Liao 已提交
552

wafwerar's avatar
wafwerar 已提交
553
#ifndef WINDOWS
H
Haojun Liao 已提交
554
  taosSetCoreDump(true);
wafwerar's avatar
wafwerar 已提交
555
#endif
H
Haojun Liao 已提交
556 557

  initTaskQueue();
558
  fmFuncMgtInit();
559
  nodesInitAllocatorSet();
H
Haojun Liao 已提交
560

561
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
dengyihao's avatar
dengyihao 已提交
562
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
H
Haojun Liao 已提交
563

dengyihao's avatar
dengyihao 已提交
564
  // transDestroyBuffer(&conn->readBuf);
H
Haojun Liao 已提交
565
  taosGetAppName(appInfo.appName, NULL);
wafwerar's avatar
wafwerar 已提交
566
  taosThreadMutexInit(&appInfo.mutex, NULL);
H
Haojun Liao 已提交
567

D
dapan1121 已提交
568
  tscCrashReportInit();
dengyihao's avatar
dengyihao 已提交
569

H
Haojun Liao 已提交
570 571 572 573
  tscDebug("client is initialized successfully");
}

int taos_init() {
wafwerar's avatar
wafwerar 已提交
574
  taosThreadOnce(&tscinit, taos_init_imp);
H
Haojun Liao 已提交
575 576 577 578
  return tscInitRes;
}

int taos_options_imp(TSDB_OPTION option, const char *str) {
H
Haojun Liao 已提交
579
  if (option == TSDB_OPTION_CONFIGDIR) {
S
Shengliang Guan 已提交
580 581 582
    tstrncpy(configDir, str, PATH_MAX);
    tscInfo("set cfg:%s to %s", configDir, str);
    return 0;
H
Haojun Liao 已提交
583 584
  } else {
    taos_init();  // initialize global config
S
Shengliang Guan 已提交
585 586
  }

dengyihao's avatar
dengyihao 已提交
587
  SConfig     *pCfg = taosGetCfg();
S
Shengliang Guan 已提交
588
  SConfigItem *pItem = NULL;
H
Haojun Liao 已提交
589 590 591

  switch (option) {
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
S
Shengliang Guan 已提交
592
      pItem = cfgGetItem(pCfg, "shellActivityTimer");
H
Haojun Liao 已提交
593
      break;
S
Shengliang Guan 已提交
594 595
    case TSDB_OPTION_LOCALE:
      pItem = cfgGetItem(pCfg, "locale");
H
Haojun Liao 已提交
596
      break;
S
Shengliang Guan 已提交
597 598
    case TSDB_OPTION_CHARSET:
      pItem = cfgGetItem(pCfg, "charset");
H
Haojun Liao 已提交
599 600
      break;
    case TSDB_OPTION_TIMEZONE:
S
Shengliang Guan 已提交
601
      pItem = cfgGetItem(pCfg, "timezone");
H
Haojun Liao 已提交
602
      break;
603 604 605
    case TSDB_OPTION_USE_ADAPTER:
      pItem = cfgGetItem(pCfg, "useAdapter");
      break;
H
Haojun Liao 已提交
606
    default:
S
Shengliang Guan 已提交
607 608 609 610 611 612 613 614 615 616 617 618 619
      break;
  }

  if (pItem == NULL) {
    tscError("Invalid option %d", option);
    return -1;
  }

  int code = cfgSetItem(pCfg, pItem->name, str, CFG_STYPE_TAOS_OPTIONS);
  if (code != 0) {
    tscError("failed to set cfg:%s to %s since %s", pItem->name, str, terrstr());
  } else {
    tscInfo("set cfg:%s to %s", pItem->name, str);
H
Haojun Liao 已提交
620
  }
S
Shengliang Guan 已提交
621 622

  return code;
H
Haojun Liao 已提交
623 624
}

625 626 627 628 629
/**
 * The request id is an unsigned integer format of 64bit.
 *+------------+-----+-----------+---------------+
 *| uid|localIp| PId | timestamp | serial number |
 *+------------+-----+-----------+---------------+
630
 *| 12bit      |12bit|24bit      |16bit          |
631 632 633 634
 *+------------+-----+-----------+---------------+
 * @return
 */
uint64_t generateRequestId() {
H
Haojun Liao 已提交
635
  static uint64_t hashId = 0;
dengyihao's avatar
dengyihao 已提交
636
  static uint32_t requestSerialId = 0;
H
Haojun Liao 已提交
637 638 639 640 641 642 643 644 645 646 647

  if (hashId == 0) {
    char    uid[64] = {0};
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
               tstrerror(TAOS_SYSTEM_ERROR(errno)));

    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
648 649
  }

D
dapan1121 已提交
650
  uint64_t id = 0;
L
Liu Jicong 已提交
651

D
dapan1121 已提交
652 653 654
  while (true) {
    int64_t  ts = taosGetTimestampMs();
    uint64_t pid = taosGetPId();
dengyihao's avatar
dengyihao 已提交
655 656
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
657

D
dapan1121 已提交
658 659 660 661 662
    id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
    if (id) {
      break;
    }
  }
663 664 665
  return id;
}

H
Haojun Liao 已提交
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
#if 0
#include "cJSON.h"
static setConfRet taos_set_config_imp(const char *config){
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
  static bool setConfFlag = false;
  if (setConfFlag) {
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
    strcpy(ret.retMsg, "configuration can only set once");
    return ret;
  }
  taosInitGlobalCfg();
  cJSON *root = cJSON_Parse(config);
  if (root == NULL){
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
    strcpy(ret.retMsg, "parse json error");
    return ret;
  }

  int size = cJSON_GetArraySize(root);
  if(!cJSON_IsObject(root) || size == 0) {
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
    strcpy(ret.retMsg, "json content is invalid, must be not empty object");
    return ret;
  }

  if(size >= 1000) {
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
    strcpy(ret.retMsg, "json object size is too long");
    return ret;
  }

  for(int i = 0; i < size; i++){
    cJSON *item = cJSON_GetArrayItem(root, i);
    if(!item) {
      ret.retCode = SET_CONF_RET_ERR_INNER;
      strcpy(ret.retMsg, "inner error");
      return ret;
    }
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
      ret.retCode = SET_CONF_RET_ERR_PART;
      if (strlen(ret.retMsg) == 0){
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
      }else{
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
        size_t leftSize = tmp >= 0 ? tmp : 0;
        strncat(ret.retMsg, "|",  leftSize);
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
        leftSize = tmp >= 0 ? tmp : 0;
        strncat(ret.retMsg, item->string, leftSize);
      }
    }
  }
  cJSON_Delete(root);
  setConfFlag = true;
  return ret;
}

setConfRet taos_set_config(const char *config){
wafwerar's avatar
wafwerar 已提交
724
  taosThreadMutexLock(&setConfMutex);
H
Haojun Liao 已提交
725
  setConfRet ret = taos_set_config_imp(config);
wafwerar's avatar
wafwerar 已提交
726
  taosThreadMutexUnlock(&setConfMutex);
H
Haojun Liao 已提交
727 728
  return ret;
}
729
#endif