clientImpl.c 23.5 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15 16

#define CHECK_CODE_GOTO(expr, lable) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20 21 22
      terrno = code;                 \
      goto lable;                    \
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

H
Haojun Liao 已提交
30
  static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60 61
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void  setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

  char tmp[TSDB_DB_NAME_LEN] = {0};
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

    tstrncpy(tmp, db, sizeof(tmp));
    strdequote(tmp);
  }

85
  char secretEncrypt[32] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

H
Haojun Liao 已提交
114
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
115
  if (pInst == NULL) {
H
Haojun Liao 已提交
116 117
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
118
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
fix  
Liu Jicong 已提交
119
    /*p->pAppHbMgr = appHbMgrInit(p);*/
H
Haojun Liao 已提交
120
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
121

H
Haojun Liao 已提交
122
    pInst = &p;
123 124
  }

H
Haojun Liao 已提交
125 126
  tfree(key);
  return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
127 128
}

X
Xiaoyu Wang 已提交
129 130 131
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
132
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
133
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
134 135
  }

X
Xiaoyu Wang 已提交
136 137 138 139 140
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
141 142
  }

X
Xiaoyu Wang 已提交
143 144 145
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
146

H
Haojun Liao 已提交
147
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
148 149
  return TSDB_CODE_SUCCESS;
}
150

X
Xiaoyu Wang 已提交
151
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
152 153
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
154
  SParseContext cxt = {
H
Haojun Liao 已提交
155 156 157 158
    .requestId = pRequest->requestId,
    .acctId = pTscObj->acctId,
    .db     = getConnectionDB(pTscObj),
    .pTransporter = pTscObj->pTransporter,
159
    .pSql   = pRequest->sqlstr,
X
Xiaoyu Wang 已提交
160
    .sqlLen = pRequest->sqlLen,
161
    .pMsg   = pRequest->msgBuf,
X
Xiaoyu Wang 已提交
162 163
    .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
  };
H
Haojun Liao 已提交
164

H
Haojun Liao 已提交
165 166
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
167
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
168
    tfree(cxt.db);
H
Haojun Liao 已提交
169 170 171 172
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
173

H
Haojun Liao 已提交
174
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
175 176
  return code;
}
H
Haojun Liao 已提交
177

X
Xiaoyu Wang 已提交
178 179 180
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
181
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
182

H
Haojun Liao 已提交
183
  STscObj* pTscObj = pRequest->pTscObj;
184
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
185

186
  int64_t transporterId = 0;
187 188 189 190
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
191
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
192 193 194
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
195
    asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
196
  } else {
197
    SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
198
    asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
199 200
  }

X
Xiaoyu Wang 已提交
201 202 203
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
204

205 206
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
  pRequest->type = pQueryNode->type;
207 208

  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
209
  int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
210 211 212 213 214
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
215 216 217 218 219
    SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);

    SSubplan* pPlan = taosArrayGetP(pa, 0);
    SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
    setResSchemaInfo(pResInfo, pDataBlockSchema);
H
Haojun Liao 已提交
220

221
    pRequest->type = TDMT_VND_QUERY;
222 223 224
  }

  return code;
X
Xiaoyu Wang 已提交
225 226
}

227 228 229 230 231 232 233 234 235 236
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
  assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);

  pResInfo->numOfCols = pDataBlockSchema->numOfCols;
  pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
    SSchema* pSchema = &pDataBlockSchema->pSchema[i];
    pResInfo->fields[i].bytes = pSchema->bytes;
    pResInfo->fields[i].type  = pSchema->type;
237
    tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
238
  }
X
Xiaoyu Wang 已提交
239 240
}

241
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
242
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
243
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
244

245
    int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
246 247 248
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
249 250
      if (pRequest->body.pQueryJob != NULL) {
        scheduleFreeJob(pRequest->body.pQueryJob);
251 252 253
      }
    }

H
Haojun Liao 已提交
254 255 256
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
257
  }
258

259
  return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
260
}
X
Xiaoyu Wang 已提交
261

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
typedef struct tmq_t tmq_t;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

typedef struct tmq_topic_vgroup_list_t {
  char* topicName;
  int32_t vgId;
  int64_t committedOffset;
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

typedef struct tmq_conf_t{
  char*          clientId;
  char*          groupId;
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
} tmq_conf_t;

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  SArray*        clientTopics;  // SArray<SMqClientTopic>
};

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
  strcpy(kv.key, "groupId");
  kv.keyLen = strlen("groupId") + 1;
  kv.value = malloc(256);
  if (kv.value == NULL) {
    free(kv.key);
    taosArrayDestroy(pArray);
    return NULL;
  }
  strcpy(kv.value, pTmq->groupId);
  kv.valueLen = strlen(pTmq->groupId) + 1;

  taosArrayPush(pArray, &kv);
  strcpy(kv.key, "clientUid");
  kv.keyLen = strlen("clientUid") + 1;
  *(uint32_t*)kv.value = pTmq->pTscObj->connId;
  kv.valueLen = sizeof(uint32_t); 
  
  return NULL;
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQueryNode*  pQuery = NULL;
  SQueryDag*   pDag = NULL;
  char *dagStr = NULL;

  terrno = TSDB_CODE_SUCCESS;

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
L
Liu Jicong 已提交
367 368 369

//temporary disabled until planner ready
#if 0
L
Liu Jicong 已提交
370 371 372 373 374 375 376 377 378
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
  //TODO: check sql valid

  CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);

  dagStr = qDagToString(pDag);
  if(dagStr == NULL) {
    //TODO
  }
L
Liu Jicong 已提交
379
#endif
L
Liu Jicong 已提交
380 381 382 383

  SCMCreateTopicReq req = {
    .name = (char*)name,
    .igExists = 0,
L
Liu Jicong 已提交
384 385 386
    /*.physicalPlan = dagStr,*/
    .physicalPlan = (char*)sql,
    .logicalPlan = "",
L
Liu Jicong 已提交
387 388
  };

L
Liu Jicong 已提交
389 390 391 392 393 394 395 396
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
397 398 399

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };

400
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
  SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

_return:
  qDestroyQuery(pQuery);
  qDestroyQueryDag(pDag); 
  destroySendMsgInfo(body);
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
  return pRequest;
}

L
Liu Jicong 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
typedef struct tmq_message_t {
  int32_t  numOfRows; 
  char*    topicName;
  TAOS_ROW row[];
} tmq_message_t;

tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
  return NULL;
}

tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
  return NULL;
}

void tmq_message_destroy(tmq_message_t* mq_message) {
  
}


X
Xiaoyu Wang 已提交
437 438 439 440 441 442
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
443 444
  }

X
Xiaoyu Wang 已提交
445 446
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
447 448 449
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQuery   = NULL;
  SQueryDag   *pDag     = NULL;
X
Xiaoyu Wang 已提交
450

X
Xiaoyu Wang 已提交
451
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
452 453
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
H
Haojun Liao 已提交
454

X
Xiaoyu Wang 已提交
455 456
  if (qIsDdlQuery(pQuery)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
457
  } else {
X
Xiaoyu Wang 已提交
458
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
459
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return);
X
Xiaoyu Wang 已提交
460
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
461 462 463
  }

_return:
L
Liu Jicong 已提交
464
  qDestroyQuery(pQuery);
X
Xiaoyu Wang 已提交
465
  qDestroyQueryDag(pDag);
X
Xiaoyu Wang 已提交
466
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
467 468
    pRequest->code = terrno;
  }
469

470 471 472
  return pRequest;
}

473
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
474 475
  pEpSet->version = 0;

H
Haojun Liao 已提交
476
  // init mnode ip set
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

509
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
510
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
511 512 513 514 515
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
516
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
517 518 519
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
520 521 522
    return NULL;
  }

523
  SMsgSendInfo* body = buildConnectMsg(pRequest);
524 525

  int64_t transporterId = 0;
H
Haojun Liao 已提交
526
  asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
527 528 529 530 531 532 533 534 535 536

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
537
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
538 539 540 541 542 543
    destroyRequest(pRequest);
  }

  return pTscObj;
}

544 545 546 547 548 549 550
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
551
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
552
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
553 554
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
555
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
556
  pMsgSendInfo->param           = pRequest;
557

S
Shengliang Guan 已提交
558
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
559
  if (pConnect == NULL) {
560
    tfree(pMsgSendInfo);
561
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
562
    return NULL;
563 564
  }

565 566
  STscObj *pObj = pRequest->pTscObj;

567
  char* db = getConnectionDB(pObj);
568
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
569
  tfree(db);
570

571 572 573
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
574

575 576
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
577 578
}

579
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
580
  assert(pMsgBody != NULL);
581 582
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
583 584
}

585
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
586 587
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
588

589
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
590
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
591
    assert(pRequest->self == pSendInfo->requestObjRefId);
592

593 594
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
595

596 597 598 599 600
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
601 602
    }

603
    /*
604 605
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
606
     */
607
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
608
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
609
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
610
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
611
    } else {
H
Haojun Liao 已提交
612
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
613
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
614
    }
615

H
Haojun Liao 已提交
616
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
617 618
  }

619 620 621 622 623 624 625 626 627 628
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
629 630
  }

631
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
632
  rpcFreeCont(pMsg->pCont);
633
  destroySendMsgInfo(pSendInfo);
634
}
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
661 662 663 664
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
665
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
666

H
Haojun Liao 已提交
667
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
668
    if (pRequest->type == TDMT_VND_QUERY) {
669 670 671 672 673
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

D
dapan1121 已提交
674
      scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
H
Haojun Liao 已提交
675 676
      setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);

677
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
678 679 680 681
        return NULL;
      }

      goto _return;
682
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
683 684 685
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
      asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
709
    }
H
Haojun Liao 已提交
710

711
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
712

713 714
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
715
    asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
H
Haojun Liao 已提交
716 717 718 719 720 721 722 723 724

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
725 726
_return:

H
Haojun Liao 已提交
727 728 729 730 731
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
732
    }
H
Haojun Liao 已提交
733 734 735 736 737 738
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
739 740 741 742 743 744 745 746
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

747
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
748 749 750 751 752
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
753 754 755
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
756 757 758
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
759
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
760 761
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
762
  }
S
Shengliang Guan 已提交
763 764
}

765 766 767 768 769
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
  p = strndup(pObj->db, tListLen(pObj->db));
  pthread_mutex_unlock(&pObj->mutex);
S
Shengliang Guan 已提交
770

771 772 773 774 775 776 777 778 779
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
780

781 782 783 784
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
785 786
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
787 788
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
789 790

  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
791
}