clientImpl.c 15.6 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5
#include "tdef.h"
#include "tep.h"
6
#include "tglobal.h"
7
#include "tmsgtype.h"
8 9
#include "tnote.h"
#include "tpagedfile.h"
10
#include "tref.h"
11
#include "parser.h"
X
Xiaoyu Wang 已提交
12 13 14 15 16 17 18 19 20 21 22
#include "planner.h"
#include "scheduler.h"

#define CHECK_CODE_GOTO(expr, lable) \
  do { \
    int32_t code = expr; \
    if (TSDB_CODE_SUCCESS != code) { \
      terrno = code; \
      goto lable; \
    } \
  } while (0)
23

24 25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
H
Haojun Liao 已提交
26
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody);
H
Haojun Liao 已提交
27

28
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47

static bool stringLengthCheck(const char* str, size_t maxsize) {
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60 61
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62 63

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
64 65 66 67
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

  char tmp[TSDB_DB_NAME_LEN] = {0};
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

    tstrncpy(tmp, db, sizeof(tmp));
    strdequote(tmp);
  }

84
  char secretEncrypt[32] = {0};
85 86 87 88 89 90
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

91
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
92 93 94 95
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

96
  SCorEpSet epSet = {0};
97 98 99 100 101 102 103 104 105 106 107 108 109 110
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

111 112
  char* key = getClusterKey(user, secretEncrypt, ip, port);

H
Haojun Liao 已提交
113
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
114
  if (pInst == NULL) {
H
Haojun Liao 已提交
115 116
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
117
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
H
Haojun Liao 已提交
118
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
119

H
Haojun Liao 已提交
120
    pInst = &p;
121 122
  }

H
Haojun Liao 已提交
123 124
  tfree(key);
  return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
125 126
}

X
Xiaoyu Wang 已提交
127 128 129
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
130
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
131
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
132 133
  }

X
Xiaoyu Wang 已提交
134 135 136 137 138
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
139 140
  }

X
Xiaoyu Wang 已提交
141 142 143
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
144

X
Xiaoyu Wang 已提交
145 146 147
  tscDebugL("0x%"PRIx64" SQL: %s", (*pRequest)->requestId, (*pRequest)->sqlstr);
  return TSDB_CODE_SUCCESS;
}
148

X
Xiaoyu Wang 已提交
149
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
X
Xiaoyu Wang 已提交
150
  SParseContext cxt = {
X
Xiaoyu Wang 已提交
151
    .ctx = {.requestId = pRequest->requestId, .acctId = pRequest->pTscObj->acctId, .db = getConnectionDB(pRequest->pTscObj)},
X
Xiaoyu Wang 已提交
152
    .pSql = pRequest->sqlstr,
X
Xiaoyu Wang 已提交
153
    .sqlLen = pRequest->sqlLen,
X
Xiaoyu Wang 已提交
154 155 156
    .pMsg = pRequest->msgBuf,
    .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
  };
X
Xiaoyu Wang 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
  int32_t code = qParseQuerySql(&cxt, pQuery);
  tfree(cxt.ctx.db);
  return code;
}

int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
  pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};

  SRequestMsgBody body = buildRequestMsgImpl(pRequest);
  SEpSet* pEpSet = &pRequest->pTscObj->pAppInfo->mgmtEp.epSet;

  if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) {
    struct SCatalog* pCatalog = NULL;

    char buf[12] = {0};
    sprintf(buf, "%d", pRequest->pTscObj->pAppInfo->clusterId);
    int32_t code = catalogGetHandle(buf, &pCatalog);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SCreateTableMsg* pMsg = body.msgInfo.pMsg;

    SName t = {0};
    tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);

    char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
    tNameGetFullDbName(&t, db);

    SVgroupInfo info = {0};
    catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);

    int64_t transporterId = 0;
    SEpSet ep = {0};
    ep.inUse = info.inUse;
    ep.numOfEps = info.numOfEps;
    for(int32_t i = 0; i < ep.numOfEps; ++i) {
      ep.port[i] = info.epAddr[i].port;
      tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
198
    }
X
Xiaoyu Wang 已提交
199

X
Xiaoyu Wang 已提交
200 201 202 203
    sendMsgToServer(pRequest->pTscObj->pTransporter, &ep, &body, &transporterId);
  } else {
    int64_t transporterId = 0;
    sendMsgToServer(pRequest->pTscObj->pTransporter, pEpSet, &body, &transporterId);
204 205
  }

X
Xiaoyu Wang 已提交
206 207 208 209
  tsem_wait(&pRequest->body.rspSem);
  destroyRequestMsgBody(&body);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
210

X
Xiaoyu Wang 已提交
211 212 213 214 215 216 217 218 219 220
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
  return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
}

TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
221 222
  }

X
Xiaoyu Wang 已提交
223 224 225 226 227 228 229
  nPrintTsc("%s", sql)

  SRequestObj* pRequest = NULL;
  SQueryNode* pQuery = NULL;
  SQueryDag* pDag = NULL;
  void* pJob = NULL;

X
Xiaoyu Wang 已提交
230
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
231 232 233 234 235 236 237 238 239 240 241 242
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
  if (qIsDdlQuery(pQuery)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
    goto _return;
  }
  CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);
  CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);

_return:
  qDestoryQuery(pQuery);
  qDestroyQueryDag(pDag);
X
Xiaoyu Wang 已提交
243
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
244 245
    pRequest->code = terrno;
  }
246 247 248
  return pRequest;
}

249
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
250 251
  pEpSet->version = 0;

H
Haojun Liao 已提交
252
  // init mnode ip set
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

285
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
286
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
287 288 289 290 291
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

292
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_MSG_TYPE_CONNECT);
293 294 295
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
296 297 298
    return NULL;
  }

299 300 301 302 303 304 305
  SRequestMsgBody body = {0};
  buildConnectMsg(pRequest, &body);

  int64_t transporterId = 0;
  sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);

  tsem_wait(&pRequest->body.rspSem);
H
Haojun Liao 已提交
306
  destroyRequestMsgBody(&body);
H
Haojun Liao 已提交
307

308 309 310 311 312 313 314 315
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
316
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
317 318 319 320 321 322 323 324
    destroyRequest(pRequest);
  }

  return pTscObj;
}

static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
  pMsgBody->msgType         = TSDB_MSG_TYPE_CONNECT;
325
  pMsgBody->msgInfo.len     = sizeof(SConnectMsg);
326 327 328 329
  pMsgBody->requestObjRefId = pRequest->self;

  SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
  if (pConnect == NULL) {
330
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
331
    return -1;
332 333
  }

334 335
  STscObj *pObj = pRequest->pTscObj;

336
  char* db = getConnectionDB(pObj);
337
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
338
  tfree(db);
339

340 341 342
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
343

344
  pMsgBody->msgInfo.pMsg = pConnect;
345 346 347
  return 0;
}

H
Haojun Liao 已提交
348
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) {
H
Haojun Liao 已提交
349
  assert(pMsgBody != NULL);
350
  tfree(pMsgBody->msgInfo.pMsg);
H
Haojun Liao 已提交
351 352
}

353
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
354
  char *pMsg = rpcMallocCont(pBody->msgInfo.len);
355 356
  if (NULL == pMsg) {
    tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
357
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
358
    return -1;
359 360
  }

361
  memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len);
362 363 364
  SRpcMsg rpcMsg = {
      .msgType = pBody->msgType,
      .pCont   = pMsg,
365
      .contLen = pBody->msgInfo.len,
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
      .ahandle = (void*) pBody->requestObjRefId,
      .handle  = NULL,
      .code    = 0
  };

  rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
  return TSDB_CODE_SUCCESS;
}

void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  int64_t requestRefId = (int64_t)pMsg->ahandle;

  SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId);
  if (pRequest == NULL) {
    rpcFreeCont(pMsg->pCont);
    return;
  }
383

384 385
  assert(pRequest->self == requestRefId);
  pRequest->metric.rsp = taosGetTimestampMs();
386

387 388 389 390 391 392 393 394 395 396 397 398 399 400
  pRequest->code = pMsg->code;

  STscObj *pTscObj = pRequest->pTscObj;
  if (pEpSet) {
    if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
    }
  }

  /*
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
   */
  if (pMsg->code == TSDB_CODE_SUCCESS) {
401 402
    tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
             tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
403
    if (handleRequestRspFp[pRequest->type]) {
H
Haojun Liao 已提交
404 405 406 407 408 409 410 411
      char *p = malloc(pMsg->contLen);
      if (p == NULL) {
        pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        terrno = pRequest->code;
      } else {
        memcpy(p, pMsg->pCont, pMsg->contLen);
        pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen);
      }
412 413
    }
  } else {
414 415
    tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType],
             tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
416 417
  }

H
Haojun Liao 已提交
418
  taosReleaseRef(tscReqRef, requestRefId);
419
  rpcFreeCont(pMsg->pCont);
H
Haojun Liao 已提交
420 421

  sem_post(&pRequest->body.rspSem);
422
}
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
449 450 451 452
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
453
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
454

H
Haojun Liao 已提交
455
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
456
    pRequest->type = TSDB_MSG_TYPE_SHOW_RETRIEVE;
H
Haojun Liao 已提交
457

458
    SRequestMsgBody body = buildRequestMsgImpl(pRequest);
H
Haojun Liao 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477

    int64_t transporterId = 0;
    STscObj* pTscObj = pRequest->pTscObj;
    sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);

    tsem_wait(&pRequest->body.rspSem);
    destroyRequestMsgBody(&body);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
478
    }
H
Haojun Liao 已提交
479 480 481 482 483 484
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

485
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
486 487 488 489 490 491 492 493
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
494
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
495 496
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
497
  }
S
Shengliang Guan 已提交
498 499
}

500 501 502 503 504
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
  p = strndup(pObj->db, tListLen(pObj->db));
  pthread_mutex_unlock(&pObj->mutex);
S
Shengliang Guan 已提交
505

506 507 508 509 510 511 512 513 514
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
515