clientEnv.c 25.0 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "catalog.h"
H
Haojun Liao 已提交
17 18
#include "clientInt.h"
#include "clientLog.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "functionMgt.h"
#include "os.h"
21
#include "query.h"
X
Xiaoyu Wang 已提交
22
#include "qworker.h"
X
Xiaoyu Wang 已提交
23
#include "scheduler.h"
H
Haojun Liao 已提交
24 25
#include "tcache.h"
#include "tglobal.h"
dengyihao's avatar
dengyihao 已提交
26
#include "thttp.h"
dengyihao's avatar
dengyihao 已提交
27
#include "tmsg.h"
H
Haojun Liao 已提交
28 29
#include "tref.h"
#include "trpc.h"
dengyihao's avatar
dengyihao 已提交
30
#include "tsched.h"
H
Haojun Liao 已提交
31
#include "ttime.h"
dengyihao's avatar
dengyihao 已提交
32
#include "tversion.h"
H
Haojun Liao 已提交
33

34 35 36 37
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h"
#endif

H
Haojun Liao 已提交
38
#define TSC_VAR_NOT_RELEASE 1
39
#define TSC_VAR_RELEASED    0
H
Haojun Liao 已提交
40

D
dapan1121 已提交
41
STscDbg  tscDbg = {0};
dengyihao's avatar
dengyihao 已提交
42
SAppInfo appInfo;
D
dapan1121 已提交
43
int64_t  lastClusterId = 0;
L
Liu Jicong 已提交
44
int32_t  clientReqRefPool = -1;
dengyihao's avatar
dengyihao 已提交
45
int32_t  clientConnRefPool = -1;
46
int32_t  clientStop = -1;
H
Haojun Liao 已提交
47

dengyihao's avatar
dengyihao 已提交
48 49
int32_t timestampDeltaLimit = 900;  // s

L
Liu Jicong 已提交
50 51
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t    tscInitRes = 0;
H
Haojun Liao 已提交
52

dengyihao's avatar
dengyihao 已提交
53
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
H
Haojun Liao 已提交
54
  // connection has been released already, abort creating request.
H
Haojun Liao 已提交
55
  pRequest->self = taosAddRef(clientReqRefPool, pRequest);
H
Haojun Liao 已提交
56 57 58 59

  int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);

  if (pTscObj->pAppInfo) {
D
dapan1121 已提交
60
    SAppClusterSummary *pSummary = &pTscObj->pAppInfo->summary;
H
Haojun Liao 已提交
61

L
Liu Jicong 已提交
62 63
    int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
    int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
dengyihao's avatar
dengyihao 已提交
64 65
    tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
             ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
D
dapan1121 已提交
66
             pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
H
Haojun Liao 已提交
67
  }
D
dapan1121 已提交
68 69

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
70 71
}

dengyihao's avatar
dengyihao 已提交
72
static void deregisterRequest(SRequestObj *pRequest) {
X
Xiaoyu Wang 已提交
73
  if (pRequest == NULL) {
74 75 76
    tscError("pRequest == NULL");
    return;
  }
H
Haojun Liao 已提交
77

dengyihao's avatar
dengyihao 已提交
78
  STscObj            *pTscObj = pRequest->pTscObj;
D
dapan1121 已提交
79
  SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
H
Haojun Liao 已提交
80

L
Liu Jicong 已提交
81
  int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
H
Haojun Liao 已提交
82
  int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
83
  int32_t reqType = SLOW_LOG_TYPE_OTHERS;
H
Haojun Liao 已提交
84

85
  int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
dengyihao's avatar
dengyihao 已提交
86 87
  tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64
           " elapsed:%.2f ms, "
88 89
           "current:%d, app current:%d",
           pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
D
dapan1121 已提交
90

91
  if (pRequest->pQuery && pRequest->pQuery->pRoot) {
X
Xiaoyu Wang 已提交
92 93
    if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->pQuery->pRoot->type &&
        (0 == ((SVnodeModifyOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) {
94
      tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
dengyihao's avatar
dengyihao 已提交
95 96 97
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
98
      atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
99
      reqType = SLOW_LOG_TYPE_INSERT;
100 101
    } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
      tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
dengyihao's avatar
dengyihao 已提交
102 103 104
               "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
               duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
               pRequest->metric.planCostUs, pRequest->metric.execCostUs);
105 106

      atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
107
      reqType = SLOW_LOG_TYPE_QUERY;
108
    }
D
dapan1121 已提交
109
  }
dengyihao's avatar
dengyihao 已提交
110

111
  if (duration >= (tsSlowLogThreshold * 1000000UL)) {
D
dapan1121 已提交
112
    atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
113
    if (tsSlowLogScope & reqType) {
H
Haojun Liao 已提交
114
      taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
dengyihao's avatar
dengyihao 已提交
115 116
                       taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
                       pRequest->sqlstr);
117
    }
D
dapan1121 已提交
118
  }
dengyihao's avatar
dengyihao 已提交
119

D
dapan1121 已提交
120
  releaseTscObj(pTscObj->id);
H
Haojun Liao 已提交
121 122
}

123
// todo close the transporter properly
D
dapan1121 已提交
124 125
void closeTransporter(SAppInstInfo *pAppInfo) {
  if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
H
Haojun Liao 已提交
126 127 128
    return;
  }

D
dapan1121 已提交
129 130
  tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
  rpcClose(pAppInfo->pTransporter);
H
Haojun Liao 已提交
131 132
}

dengyihao's avatar
dengyihao 已提交
133
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
D
dapan1121 已提交
134
  if (NEED_REDIRECT_ERROR(code)) {
dengyihao's avatar
dengyihao 已提交
135
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
D
dapan1121 已提交
136
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) {
dengyihao's avatar
dengyihao 已提交
137 138
      return false;
    }
D
dapan1121 已提交
139 140 141 142 143 144
    return true;
  } else {
    return false;
  }
}

dengyihao's avatar
dengyihao 已提交
145 146
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
dengyihao's avatar
dengyihao 已提交
147 148 149
  if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
    return true;
  }
dengyihao's avatar
dengyihao 已提交
150 151 152
  return false;
}

H
Haojun Liao 已提交
153
// TODO refactor
dengyihao's avatar
dengyihao 已提交
154
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
H
Haojun Liao 已提交
155 156 157 158
  SRpcInit rpcInit;
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 0;
  rpcInit.label = "TSC";
dengyihao's avatar
dengyihao 已提交
159
  rpcInit.numOfThreads = tsNumOfRpcThreads;
H
Haojun Liao 已提交
160
  rpcInit.cfp = processMsgFromServer;
D
dapan1121 已提交
161
  rpcInit.rfp = clientRpcRfp;
S
Shengliang Guan 已提交
162
  rpcInit.sessions = 1024;
H
Haojun Liao 已提交
163 164
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.user = (char *)user;
S
Shengliang Guan 已提交
165
  rpcInit.idleTime = tsShellActivityTimer * 1000;
dengyihao's avatar
dengyihao 已提交
166
  rpcInit.compressSize = tsCompressMsgSize;
dengyihao's avatar
dengyihao 已提交
167
  rpcInit.dfp = destroyAhandle;
dengyihao's avatar
dengyihao 已提交
168

dengyihao's avatar
dengyihao 已提交
169 170 171
  rpcInit.retryMinInterval = tsRedirectPeriod;
  rpcInit.retryStepFactor = tsRedirectFactor;
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
dengyihao's avatar
dengyihao 已提交
172
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
dengyihao's avatar
dengyihao 已提交
173

dengyihao's avatar
dengyihao 已提交
174
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
dengyihao's avatar
dengyihao 已提交
175
  connLimitNum = TMAX(connLimitNum, 10);
dengyihao's avatar
dengyihao 已提交
176
  connLimitNum = TMIN(connLimitNum, 1000);
dengyihao's avatar
dengyihao 已提交
177
  rpcInit.connLimitNum = connLimitNum;
dengyihao's avatar
dengyihao 已提交
178
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
dengyihao's avatar
dengyihao 已提交
179

dengyihao's avatar
dengyihao 已提交
180 181
  taosVersionStrToInt(version, &(rpcInit.compatibilityVer));

dengyihao's avatar
dengyihao 已提交
182
  void *pDnodeConn = rpcOpen(&rpcInit);
H
Haojun Liao 已提交
183 184 185 186 187 188 189 190
  if (pDnodeConn == NULL) {
    tscError("failed to init connection to server");
    return NULL;
  }

  return pDnodeConn;
}

D
dapan1121 已提交
191
void destroyAllRequests(SHashObj *pRequests) {
L
Liu Jicong 已提交
192
  void *pIter = taosHashIterate(pRequests, NULL);
D
dapan1121 已提交
193 194 195
  while (pIter != NULL) {
    int64_t *rid = pIter;

D
dapan1121 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
    SRequestObj *pRequest = acquireRequest(*rid);
    if (pRequest) {
      destroyRequest(pRequest);
      releaseRequest(*rid);
    }

    pIter = taosHashIterate(pRequests, pIter);
  }
}

void stopAllRequests(SHashObj *pRequests) {
  void *pIter = taosHashIterate(pRequests, NULL);
  while (pIter != NULL) {
    int64_t *rid = pIter;

    SRequestObj *pRequest = acquireRequest(*rid);
    if (pRequest) {
      taos_stop_query(pRequest);
      releaseRequest(*rid);
    }
L
Liu Jicong 已提交
216

D
dapan1121 已提交
217 218 219 220
    pIter = taosHashIterate(pRequests, pIter);
  }
}

dengyihao's avatar
dengyihao 已提交
221
void destroyAppInst(SAppInstInfo *pAppInfo) {
D
dapan1121 已提交
222
  tscDebug("destroy app inst mgr %p", pAppInfo);
D
dapan1121 已提交
223 224

  taosThreadMutexLock(&appInfo.mutex);
dengyihao's avatar
dengyihao 已提交
225

D
dapan1121 已提交
226 227
  hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
  taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
D
dapan1121 已提交
228 229 230

  taosThreadMutexUnlock(&appInfo.mutex);

D
dapan1121 已提交
231 232
  taosMemoryFreeClear(pAppInfo->instKey);
  closeTransporter(pAppInfo);
dengyihao's avatar
dengyihao 已提交
233

D
dapan1121 已提交
234 235
  taosThreadMutexLock(&pAppInfo->qnodeMutex);
  taosArrayDestroy(pAppInfo->pQnodeList);
dengyihao's avatar
dengyihao 已提交
236
  taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
D
dapan1121 已提交
237 238 239 240

  taosMemoryFree(pAppInfo);
}

H
Haojun Liao 已提交
241
void destroyTscObj(void *pObj) {
D
dapan1121 已提交
242 243 244
  if (NULL == pObj) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
245

H
Haojun Liao 已提交
246
  STscObj *pTscObj = pObj;
dengyihao's avatar
dengyihao 已提交
247
  int64_t  tscId = pTscObj->id;
D
dapan1121 已提交
248
  tscTrace("begin to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
H
Haojun Liao 已提交
249

D
dapan1121 已提交
250
  SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
K
kailixu 已提交
251
  hbDeregisterConn(pTscObj, connKey);
252

D
dapan1121 已提交
253
  destroyAllRequests(pTscObj->pRequests);
254
  taosHashCleanup(pTscObj->pRequests);
dengyihao's avatar
dengyihao 已提交
255

D
dapan1121 已提交
256
  schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
dengyihao's avatar
dengyihao 已提交
257
  tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
D
dapan1121 已提交
258 259
           pTscObj->pAppInfo->numOfConns);

H
Haojun Liao 已提交
260
  // In any cases, we should not free app inst here. Or an race condition rises.
dengyihao's avatar
dengyihao 已提交
261
  /*int64_t connNum = */ atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
262

wafwerar's avatar
wafwerar 已提交
263
  taosThreadMutexDestroy(&pTscObj->mutex);
D
dapan1121 已提交
264
  taosMemoryFree(pTscObj);
D
dapan1121 已提交
265

D
dapan1121 已提交
266
  tscTrace("end to destroy tscObj %" PRIx64 " p:%p", tscId, pTscObj);
H
Haojun Liao 已提交
267 268
}

269
void *createTscObj(const char *user, const char *auth, const char *db, int32_t connType, SAppInstInfo *pAppInfo) {
wafwerar's avatar
wafwerar 已提交
270
  STscObj *pObj = (STscObj *)taosMemoryCalloc(1, sizeof(STscObj));
H
Haojun Liao 已提交
271
  if (NULL == pObj) {
S
Shengliang Guan 已提交
272
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
273 274 275
    return NULL;
  }

D
dapan1121 已提交
276 277 278
  pObj->pRequests = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == pObj->pRequests) {
    taosMemoryFree(pObj);
S
Shengliang Guan 已提交
279
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
280 281
    return NULL;
  }
L
Liu Jicong 已提交
282

283
  pObj->connType = connType;
H
Haojun Liao 已提交
284 285 286 287
  pObj->pAppInfo = pAppInfo;
  tstrncpy(pObj->user, user, sizeof(pObj->user));
  memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);

288 289 290 291
  if (db != NULL) {
    tstrncpy(pObj->db, db, tListLen(pObj->db));
  }

wafwerar's avatar
wafwerar 已提交
292
  taosThreadMutexInit(&pObj->mutex, NULL);
D
dapan1121 已提交
293
  pObj->id = taosAddRef(clientConnRefPool, pObj);
H
Haojun Liao 已提交
294

D
dapan1121 已提交
295 296
  atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);

D
dapan1121 已提交
297
  tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
H
Haojun Liao 已提交
298 299 300
  return pObj;
}

L
Liu Jicong 已提交
301
STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
D
dapan1121 已提交
302

L
Liu Jicong 已提交
303
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
D
dapan1121 已提交
304

dengyihao's avatar
dengyihao 已提交
305
void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
wafwerar's avatar
wafwerar 已提交
306
  SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
H
Haojun Liao 已提交
307
  if (NULL == pRequest) {
S
Shengliang Guan 已提交
308
    terrno = TSDB_CODE_OUT_OF_MEMORY;
H
Haojun Liao 已提交
309 310 311
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
312
  STscObj *pTscObj = acquireTscObj(connId);
313
  if (pTscObj == NULL) {
314
    taosMemoryFree(pRequest);
315 316 317 318
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

L
Liu Jicong 已提交
319
  pRequest->resType = RES_TYPE__QUERY;
dengyihao's avatar
dengyihao 已提交
320
  pRequest->requestId = reqid == 0 ? generateRequestId() : reqid;
D
dapan1121 已提交
321
  pRequest->metric.start = taosGetTimestampUs();
H
Haojun Liao 已提交
322

323
  pRequest->body.resInfo.convertUcs4 = true;  // convert ucs4 by default
dengyihao's avatar
dengyihao 已提交
324
  pRequest->type = type;
325
  pRequest->allocatorRefId = -1;
326 327 328 329

  pRequest->pDb = getDbOfConnection(pTscObj);
  pRequest->pTscObj = pTscObj;

wafwerar's avatar
wafwerar 已提交
330
  pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
D
stmt  
dapan1121 已提交
331
  pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
H
Haojun Liao 已提交
332 333
  tsem_init(&pRequest->body.rspSem, 0, 0);

334
  if (registerRequest(pRequest, pTscObj)) {
D
dapan1121 已提交
335 336 337
    doDestroyRequest(pRequest);
    return NULL;
  }
L
Liu Jicong 已提交
338

H
Haojun Liao 已提交
339 340 341
  return pRequest;
}

L
Liu Jicong 已提交
342
void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
wafwerar's avatar
wafwerar 已提交
343 344 345 346 347
  taosMemoryFreeClear(pResInfo->pRspMsg);
  taosMemoryFreeClear(pResInfo->length);
  taosMemoryFreeClear(pResInfo->row);
  taosMemoryFreeClear(pResInfo->pCol);
  taosMemoryFreeClear(pResInfo->fields);
348
  taosMemoryFreeClear(pResInfo->userFields);
wmmhello's avatar
wmmhello 已提交
349
  taosMemoryFreeClear(pResInfo->convertJson);
350 351 352

  if (pResInfo->convertBuf != NULL) {
    for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
wafwerar's avatar
wafwerar 已提交
353
      taosMemoryFreeClear(pResInfo->convertBuf[i]);
354
    }
wafwerar's avatar
wafwerar 已提交
355
    taosMemoryFreeClear(pResInfo->convertBuf);
356
  }
H
Haojun Liao 已提交
357 358
}

D
dapan1121 已提交
359 360 361 362 363 364
SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }

int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }

int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }

365
void destroySubRequests(SRequestObj *pRequest) {
dengyihao's avatar
dengyihao 已提交
366
  int32_t      reqIdx = -1;
367
  SRequestObj *pReqList[16] = {NULL};
dengyihao's avatar
dengyihao 已提交
368
  uint64_t     tmpRefId = 0;
369 370 371 372

  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
373 374

  SRequestObj *pTmp = pRequest;
375 376 377 378 379 380 381
  while (pTmp->relation.prevRefId) {
    tmpRefId = pTmp->relation.prevRefId;
    pTmp = acquireRequest(tmpRefId);
    if (pTmp) {
      pReqList[++reqIdx] = pTmp;
      releaseRequest(tmpRefId);
    } else {
dengyihao's avatar
dengyihao 已提交
382 383 384
      tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
               pTmp->requestId);
      break;
385 386 387 388 389 390 391 392 393 394 395 396
    }
  }

  for (int32_t i = reqIdx; i >= 0; i--) {
    removeRequest(pReqList[i]->self);
  }

  tmpRefId = pRequest->relation.nextRefId;
  while (tmpRefId) {
    pTmp = acquireRequest(tmpRefId);
    if (pTmp) {
      tmpRefId = pTmp->relation.nextRefId;
dengyihao's avatar
dengyihao 已提交
397
      removeRequest(pTmp->self);
398 399 400
      releaseRequest(pTmp->self);
    } else {
      tscError("0x%" PRIx64 " is not there", tmpRefId);
dengyihao's avatar
dengyihao 已提交
401
      break;
402 403 404 405
    }
  }
}

D
dapan1121 已提交
406
void doDestroyRequest(void *p) {
D
dapan1121 已提交
407 408 409
  if (NULL == p) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
410

dengyihao's avatar
dengyihao 已提交
411
  SRequestObj *pRequest = (SRequestObj *)p;
dengyihao's avatar
dengyihao 已提交
412

D
dapan1121 已提交
413
  uint64_t reqId = pRequest->requestId;
D
dapan1121 已提交
414
  tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
dengyihao's avatar
dengyihao 已提交
415

416
  destroySubRequests(pRequest);
dengyihao's avatar
dengyihao 已提交
417

D
dapan1121 已提交
418
  taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
L
Liu Jicong 已提交
419

D
dapan1121 已提交
420
  schedulerFreeJob(&pRequest->body.queryJob, 0);
D
dapan1121 已提交
421

422 423
  destorySqlCallbackWrapper(pRequest->pWrapper);

wafwerar's avatar
wafwerar 已提交
424 425
  taosMemoryFreeClear(pRequest->msgBuf);
  taosMemoryFreeClear(pRequest->pDb);
H
Haojun Liao 已提交
426

H
Haojun Liao 已提交
427
  doFreeReqResultInfo(&pRequest->body.resInfo);
428
  tsem_destroy(&pRequest->body.rspSem);
H
Haojun Liao 已提交
429

X
Xiaoyu Wang 已提交
430 431
  taosArrayDestroy(pRequest->tableList);
  taosArrayDestroy(pRequest->dbList);
D
dapan1121 已提交
432
  taosArrayDestroy(pRequest->targetTableList);
X
Xiaoyu Wang 已提交
433

D
dapan1121 已提交
434
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
D
dapan1121 已提交
435

D
dapan1121 已提交
436 437 438
  if (pRequest->self) {
    deregisterRequest(pRequest);
  }
D
dapan1121 已提交
439

H
Haojun Liao 已提交
440
  if (pRequest->syncQuery) {
dengyihao's avatar
dengyihao 已提交
441 442 443
    if (pRequest->body.param) {
      tsem_destroy(&((SSyncQueryParam *)pRequest->body.param)->sem);
    }
H
Haojun Liao 已提交
444 445 446
    taosMemoryFree(pRequest->body.param);
  }

447 448 449
  qDestroyQuery(pRequest->pQuery);
  nodesDestroyAllocator(pRequest->allocatorRefId);

D
dapan1121 已提交
450
  taosMemoryFreeClear(pRequest->sqlstr);
451
  taosMemoryFree(pRequest);
D
dapan1121 已提交
452
  tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
H
Haojun Liao 已提交
453 454
}

dengyihao's avatar
dengyihao 已提交
455
void destroyRequest(SRequestObj *pRequest) {
H
Haojun Liao 已提交
456 457 458 459
  if (pRequest == NULL) {
    return;
  }

D
dapan1121 已提交
460
  taos_stop_query(pRequest);
D
dapan1121 已提交
461
  removeRequest(pRequest->self);
H
Haojun Liao 已提交
462 463
}

464 465 466 467 468 469 470 471 472 473 474 475 476 477
void taosStopQueryImpl(SRequestObj *pRequest) {
  pRequest->killed = true;

  // It is not a query, no need to stop.
  if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) {
    tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
    return;
  }

  schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
  tscDebug("request %" PRIx64 " killed", pRequest->requestId);
}

void stopAllQueries(SRequestObj *pRequest) {
dengyihao's avatar
dengyihao 已提交
478
  int32_t      reqIdx = -1;
479
  SRequestObj *pReqList[16] = {NULL};
dengyihao's avatar
dengyihao 已提交
480
  uint64_t     tmpRefId = 0;
481 482 483 484

  if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
485 486

  SRequestObj *pTmp = pRequest;
487 488 489 490 491 492 493
  while (pTmp->relation.prevRefId) {
    tmpRefId = pTmp->relation.prevRefId;
    pTmp = acquireRequest(tmpRefId);
    if (pTmp) {
      pReqList[++reqIdx] = pTmp;
      releaseRequest(tmpRefId);
    } else {
dengyihao's avatar
dengyihao 已提交
494 495 496
      tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
               pTmp->requestId);
      break;
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
    }
  }

  for (int32_t i = reqIdx; i >= 0; i--) {
    taosStopQueryImpl(pReqList[i]);
  }

  taosStopQueryImpl(pRequest);

  tmpRefId = pRequest->relation.nextRefId;
  while (tmpRefId) {
    pTmp = acquireRequest(tmpRefId);
    if (pTmp) {
      tmpRefId = pTmp->relation.nextRefId;
      taosStopQueryImpl(pTmp);
      releaseRequest(pTmp->self);
    } else {
      tscError("0x%" PRIx64 " is not there", tmpRefId);
dengyihao's avatar
dengyihao 已提交
515
      break;
516 517 518 519
    }
  }
}

D
dapan 已提交
520 521
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }

D
dapan1121 已提交
522 523 524 525
static void *tscCrashReportThreadFp(void *param) {
  setThreadName("client-crashReport");
  char filepath[PATH_MAX] = {0};
  snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP);
dengyihao's avatar
dengyihao 已提交
526 527
  char     *pMsg = NULL;
  int64_t   msgLen = 0;
D
dapan1121 已提交
528
  TdFilePtr pFile = NULL;
dengyihao's avatar
dengyihao 已提交
529 530 531 532
  bool      truncateFile = false;
  int32_t   sleepTime = 200;
  int32_t   reportPeriodNum = 3600 * 1000 / sleepTime;
  int32_t   loopTimes = reportPeriodNum;
D
dapan 已提交
533 534 535 536 537 538

#ifdef WINDOWS
  if (taosCheckCurrentInDll()) {
    atexit(crashReportThreadFuncUnexpectedStopped);
  }
#endif
dengyihao's avatar
dengyihao 已提交
539

540 541 542 543
  if (-1 != atomic_val_compare_exchange_32(&clientStop, -1, 0)) {
    return NULL;
  }

D
dapan1121 已提交
544
  while (1) {
545
    if (clientStop > 0) break;
D
dapan1121 已提交
546 547 548 549 550 551 552 553 554 555 556
    if (loopTimes++ < reportPeriodNum) {
      taosMsleep(sleepTime);
      continue;
    }

    taosReadCrashInfo(filepath, &pMsg, &msgLen, &pFile);
    if (pMsg && msgLen > 0) {
      if (taosSendHttpReport(tsTelemServer, tsClientCrashReportUri, tsTelemPort, pMsg, msgLen, HTTP_FLAT) != 0) {
        tscError("failed to send crash report");
        if (pFile) {
          taosReleaseCrashLogFile(pFile, false);
D
dapan1121 已提交
557
          pFile = NULL;
D
dapan1121 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
          continue;
        }
      } else {
        tscInfo("succeed to send crash report");
        truncateFile = true;
      }
    } else {
      tscDebug("no crash info");
    }

    taosMemoryFree(pMsg);

    if (pMsg && msgLen > 0) {
      pMsg = NULL;
      continue;
    }
dengyihao's avatar
dengyihao 已提交
574

D
dapan1121 已提交
575 576
    if (pFile) {
      taosReleaseCrashLogFile(pFile, truncateFile);
D
dapan1121 已提交
577
      pFile = NULL;
D
dapan1121 已提交
578 579
      truncateFile = false;
    }
dengyihao's avatar
dengyihao 已提交
580

D
dapan1121 已提交
581 582 583 584
    taosMsleep(sleepTime);
    loopTimes = 0;
  }

585
  clientStop = -2;
D
dapan1121 已提交
586 587 588 589 590 591 592
  return NULL;
}

int32_t tscCrashReportInit() {
  if (!tsEnableCrashReport) {
    return 0;
  }
dengyihao's avatar
dengyihao 已提交
593

D
dapan1121 已提交
594 595 596
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
dengyihao's avatar
dengyihao 已提交
597
  TdThread crashReportThread;
D
dapan1121 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611
  if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) {
    tscError("failed to create crashReport thread since %s", strerror(errno));
    return -1;
  }

  taosThreadAttrDestroy(&thAttr);
  return 0;
}

void tscStopCrashReport() {
  if (!tsEnableCrashReport) {
    return;
  }

D
dapan 已提交
612 613 614 615 616
  if (atomic_val_compare_exchange_32(&clientStop, 0, 1)) {
    tscDebug("hb thread already stopped");
    return;
  }

D
dapan1121 已提交
617 618 619 620 621
  while (atomic_load_32(&clientStop) > 0) {
    taosMsleep(100);
  }
}

D
dapan1121 已提交
622
void tscWriteCrashInfo(int signum, void *sigInfo, void *context) {
dengyihao's avatar
dengyihao 已提交
623
  char       *pMsg = NULL;
D
dapan1121 已提交
624 625 626
  const char *flags = "UTL FATAL ";
  ELogLevel   level = DEBUG_FATAL;
  int32_t     dflag = 255;
dengyihao's avatar
dengyihao 已提交
627
  int64_t     msgLen = -1;
D
dapan1121 已提交
628 629 630 631 632

  if (tsEnableCrashReport) {
    if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) {
      taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
    } else {
dengyihao's avatar
dengyihao 已提交
633
      msgLen = strlen(pMsg);
D
dapan1121 已提交
634 635 636 637
    }
  }

  taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo);
D
dapan1121 已提交
638 639
}

H
Haojun Liao 已提交
640
void taos_init_imp(void) {
D
dapan1121 已提交
641 642 643 644 645 646
#if defined(LINUX)
  if (tscDbg.memEnable) {
    int32_t code = taosMemoryDbgInit();
    if (code) {
      printf("failed to init memory dbg, error:%s\n", tstrerror(code));
    } else {
dengyihao's avatar
dengyihao 已提交
647
      tsAsyncLog = false;
D
dapan1121 已提交
648 649 650 651 652
      printf("memory dbg enabled\n");
    }
  }
#endif

H
Haojun Liao 已提交
653 654 655 656
  // In the APIs of other program language, taos_cleanup is not available yet.
  // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
  atexit(taos_cleanup);
  errno = TSDB_CODE_SUCCESS;
wafwerar's avatar
wafwerar 已提交
657
  taosSeedRand(taosGetTimestampSec());
H
Haojun Liao 已提交
658

D
dapan1121 已提交
659 660 661 662
  appInfo.pid = taosGetPId();
  appInfo.startTime = taosGetTimestampMs();
  appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);

H
Haojun Liao 已提交
663
  deltaToUtcInitOnce();
S
Shengliang Guan 已提交
664

665 666 667 668 669 670 671
  char logDirName[64] = {0};
#ifdef CUS_PROMPT
  snprintf(logDirName, 64, "%slog", CUS_PROMPT);
#else
  snprintf(logDirName, 64, "taoslog");
#endif
  if (taosCreateLog(logDirName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
A
Alex Duan 已提交
672
    // ignore create log failed, only print
673
    printf(" WARING: Create %s failed:%s. configDir=%s\n", logDirName, strerror(errno), configDir);
S
Shengliang Guan 已提交
674 675
  }

wafwerar's avatar
wafwerar 已提交
676
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
S
Shengliang Guan 已提交
677 678 679 680
    tscInitRes = -1;
    return;
  }

681
  initQueryModuleMsgHandle();
H
Haojun Liao 已提交
682

S
Shengliang Guan 已提交
683
  if (taosConvInit() != 0) {
684 685
    tscError("failed to init conv");
    return;
S
Shengliang Guan 已提交
686
  }
dengyihao's avatar
dengyihao 已提交
687

S
Shengliang Guan 已提交
688
  rpcInit();
H
Haojun Liao 已提交
689

D
dapan1121 已提交
690
  SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
691 692
  catalogInit(&cfg);

D
dapan1121 已提交
693
  schedulerInit();
S
Shengliang Guan 已提交
694
  tscDebug("starting to initialize TAOS driver");
H
Haojun Liao 已提交
695

wafwerar's avatar
wafwerar 已提交
696
#ifndef WINDOWS
H
Haojun Liao 已提交
697
  taosSetCoreDump(true);
wafwerar's avatar
wafwerar 已提交
698
#endif
H
Haojun Liao 已提交
699 700

  initTaskQueue();
701
  fmFuncMgtInit();
702
  nodesInitAllocatorSet();
H
Haojun Liao 已提交
703

704
  clientConnRefPool = taosOpenRef(200, destroyTscObj);
dengyihao's avatar
dengyihao 已提交
705
  clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
H
Haojun Liao 已提交
706

dengyihao's avatar
dengyihao 已提交
707
  // transDestroyBuffer(&conn->readBuf);
H
Haojun Liao 已提交
708
  taosGetAppName(appInfo.appName, NULL);
wafwerar's avatar
wafwerar 已提交
709
  taosThreadMutexInit(&appInfo.mutex, NULL);
H
Haojun Liao 已提交
710

D
dapan1121 已提交
711
  tscCrashReportInit();
dengyihao's avatar
dengyihao 已提交
712

H
Haojun Liao 已提交
713 714 715 716
  tscDebug("client is initialized successfully");
}

int taos_init() {
wafwerar's avatar
wafwerar 已提交
717
  taosThreadOnce(&tscinit, taos_init_imp);
H
Haojun Liao 已提交
718 719 720 721
  return tscInitRes;
}

int taos_options_imp(TSDB_OPTION option, const char *str) {
H
Haojun Liao 已提交
722
  if (option == TSDB_OPTION_CONFIGDIR) {
S
Shengliang Guan 已提交
723 724 725
    tstrncpy(configDir, str, PATH_MAX);
    tscInfo("set cfg:%s to %s", configDir, str);
    return 0;
H
Haojun Liao 已提交
726 727
  } else {
    taos_init();  // initialize global config
S
Shengliang Guan 已提交
728 729
  }

dengyihao's avatar
dengyihao 已提交
730
  SConfig     *pCfg = taosGetCfg();
S
Shengliang Guan 已提交
731
  SConfigItem *pItem = NULL;
H
Haojun Liao 已提交
732 733 734

  switch (option) {
    case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
S
Shengliang Guan 已提交
735
      pItem = cfgGetItem(pCfg, "shellActivityTimer");
H
Haojun Liao 已提交
736
      break;
S
Shengliang Guan 已提交
737 738
    case TSDB_OPTION_LOCALE:
      pItem = cfgGetItem(pCfg, "locale");
H
Haojun Liao 已提交
739
      break;
S
Shengliang Guan 已提交
740 741
    case TSDB_OPTION_CHARSET:
      pItem = cfgGetItem(pCfg, "charset");
H
Haojun Liao 已提交
742 743
      break;
    case TSDB_OPTION_TIMEZONE:
S
Shengliang Guan 已提交
744
      pItem = cfgGetItem(pCfg, "timezone");
H
Haojun Liao 已提交
745
      break;
746 747 748
    case TSDB_OPTION_USE_ADAPTER:
      pItem = cfgGetItem(pCfg, "useAdapter");
      break;
H
Haojun Liao 已提交
749
    default:
S
Shengliang Guan 已提交
750 751 752 753 754 755 756 757 758 759 760 761 762
      break;
  }

  if (pItem == NULL) {
    tscError("Invalid option %d", option);
    return -1;
  }

  int code = cfgSetItem(pCfg, pItem->name, str, CFG_STYPE_TAOS_OPTIONS);
  if (code != 0) {
    tscError("failed to set cfg:%s to %s since %s", pItem->name, str, terrstr());
  } else {
    tscInfo("set cfg:%s to %s", pItem->name, str);
X
Xiaoyu Wang 已提交
763
    if (TSDB_OPTION_SHELL_ACTIVITY_TIMER == option || TSDB_OPTION_USE_ADAPTER == option) {
D
dapan1121 已提交
764
      code = taosApplyLocalCfg(pCfg, pItem->name);
X
Xiaoyu Wang 已提交
765
    }
H
Haojun Liao 已提交
766
  }
S
Shengliang Guan 已提交
767 768

  return code;
H
Haojun Liao 已提交
769 770
}

771 772 773 774 775
/**
 * The request id is an unsigned integer format of 64bit.
 *+------------+-----+-----------+---------------+
 *| uid|localIp| PId | timestamp | serial number |
 *+------------+-----+-----------+---------------+
776
 *| 12bit      |12bit|24bit      |16bit          |
777 778 779 780
 *+------------+-----+-----------+---------------+
 * @return
 */
uint64_t generateRequestId() {
H
Haojun Liao 已提交
781
  static uint64_t hashId = 0;
dengyihao's avatar
dengyihao 已提交
782
  static uint32_t requestSerialId = 0;
H
Haojun Liao 已提交
783 784 785 786 787 788 789 790 791 792 793

  if (hashId == 0) {
    char    uid[64] = {0};
    int32_t code = taosGetSystemUUID(uid, tListLen(uid));
    if (code != TSDB_CODE_SUCCESS) {
      tscError("Failed to get the system uid to generated request id, reason:%s. use ip address instead",
               tstrerror(TAOS_SYSTEM_ERROR(errno)));

    } else {
      hashId = MurmurHash3_32(uid, strlen(uid));
    }
794 795
  }

D
dapan1121 已提交
796
  uint64_t id = 0;
L
Liu Jicong 已提交
797

D
dapan1121 已提交
798 799 800
  while (true) {
    int64_t  ts = taosGetTimestampMs();
    uint64_t pid = taosGetPId();
dengyihao's avatar
dengyihao 已提交
801 802
    uint32_t val = atomic_add_fetch_32(&requestSerialId, 1);
    if (val >= 0xFFFF) atomic_store_32(&requestSerialId, 0);
803

D
dapan1121 已提交
804 805 806 807 808
    id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
    if (id) {
      break;
    }
  }
809 810 811
  return id;
}

H
Haojun Liao 已提交
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869
#if 0
#include "cJSON.h"
static setConfRet taos_set_config_imp(const char *config){
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
  static bool setConfFlag = false;
  if (setConfFlag) {
    ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
    strcpy(ret.retMsg, "configuration can only set once");
    return ret;
  }
  taosInitGlobalCfg();
  cJSON *root = cJSON_Parse(config);
  if (root == NULL){
    ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
    strcpy(ret.retMsg, "parse json error");
    return ret;
  }

  int size = cJSON_GetArraySize(root);
  if(!cJSON_IsObject(root) || size == 0) {
    ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
    strcpy(ret.retMsg, "json content is invalid, must be not empty object");
    return ret;
  }

  if(size >= 1000) {
    ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
    strcpy(ret.retMsg, "json object size is too long");
    return ret;
  }

  for(int i = 0; i < size; i++){
    cJSON *item = cJSON_GetArrayItem(root, i);
    if(!item) {
      ret.retCode = SET_CONF_RET_ERR_INNER;
      strcpy(ret.retMsg, "inner error");
      return ret;
    }
    if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
      ret.retCode = SET_CONF_RET_ERR_PART;
      if (strlen(ret.retMsg) == 0){
        snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
      }else{
        int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
        size_t leftSize = tmp >= 0 ? tmp : 0;
        strncat(ret.retMsg, "|",  leftSize);
        tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
        leftSize = tmp >= 0 ? tmp : 0;
        strncat(ret.retMsg, item->string, leftSize);
      }
    }
  }
  cJSON_Delete(root);
  setConfFlag = true;
  return ret;
}

setConfRet taos_set_config(const char *config){
wafwerar's avatar
wafwerar 已提交
870
  taosThreadMutexLock(&setConfMutex);
H
Haojun Liao 已提交
871
  setConfRet ret = taos_set_config_imp(config);
wafwerar's avatar
wafwerar 已提交
872
  taosThreadMutexUnlock(&setConfMutex);
H
Haojun Liao 已提交
873 874
  return ret;
}
875
#endif