clientImpl.c 26.4 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15

16
#define CHECK_CODE_GOTO(expr, label) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20
      terrno = code;                 \
21
      goto label;                    \
H
Haojun Liao 已提交
22
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

30
static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

61
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

74
  char localDb[TSDB_DB_NAME_LEN] = {0};
75 76 77 78 79 80
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

81 82
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
83 84
  }

85
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

114
  // TODO: race condition here.
H
Haojun Liao 已提交
115
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
116
  if (pInst == NULL) {
H
Haojun Liao 已提交
117 118
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
120
    /*p->pAppHbMgr = appHbMgrInit(p);*/
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

H
Haojun Liao 已提交
126
  tfree(key);
127
  return taosConnectImpl(user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst);
128 129
}

X
Xiaoyu Wang 已提交
130 131 132
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
133
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
134
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
135 136
  }

X
Xiaoyu Wang 已提交
137 138 139 140 141
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
142 143
  }

X
Xiaoyu Wang 已提交
144 145 146
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
147

H
Haojun Liao 已提交
148
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
149 150
  return TSDB_CODE_SUCCESS;
}
151

X
Xiaoyu Wang 已提交
152
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
153 154
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
155
  SParseContext cxt = {
H
Haojun Liao 已提交
156
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
157 158 159 160 161 162
    .acctId    = pTscObj->acctId,
    .db        = getConnectionDB(pTscObj),
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
163
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
164
  };
H
Haojun Liao 已提交
165

H
Haojun Liao 已提交
166 167
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
168
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
169
    tfree(cxt.db);
H
Haojun Liao 已提交
170 171 172 173
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
174

H
Haojun Liao 已提交
175
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
176 177
  return code;
}
H
Haojun Liao 已提交
178

X
Xiaoyu Wang 已提交
179 180 181
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
182
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
183

H
Haojun Liao 已提交
184
  STscObj* pTscObj = pRequest->pTscObj;
185
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
186

187
  int64_t transporterId = 0;
188 189 190 191
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
192
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
193 194 195
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
196
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
197
  } else {
H
Haojun Liao 已提交
198
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
199 200
  }

X
Xiaoyu Wang 已提交
201 202 203
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
204

205
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) {
206
  pRequest->type = pQueryNode->type;
207

208 209 210
  SSchema* pSchema = NULL;
  int32_t  numOfCols = 0;
  int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
211 212 213 214 215
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
216
    setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
217
    pRequest->type = TDMT_VND_QUERY;
218 219 220
  }

  return code;
X
Xiaoyu Wang 已提交
221 222
}

223 224
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
225

226 227
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
228 229

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
230 231 232
    pResInfo->fields[i].bytes = pSchema[i].bytes;
    pResInfo->fields[i].type  = pSchema[i].type;
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
233
  }
X
Xiaoyu Wang 已提交
234 235
}

236
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
237
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
238
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
239

H
Haojun Liao 已提交
240
    int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
241 242 243
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
244 245
      if (pRequest->body.pQueryJob != NULL) {
        scheduleFreeJob(pRequest->body.pQueryJob);
246 247 248
      }
    }

H
Haojun Liao 已提交
249 250 251
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
252
  }
253

254
  return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
255
}
X
Xiaoyu Wang 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
typedef struct tmq_t tmq_t;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

typedef struct tmq_topic_vgroup_list_t {
  char* topicName;
  int32_t vgId;
  int64_t committedOffset;
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

typedef struct tmq_conf_t{
  char*          clientId;
  char*          groupId;
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
} tmq_conf_t;

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  SArray*        clientTopics;  // SArray<SMqClientTopic>
};

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
L
Liu Jicong 已提交
319 320 321 322 323
  strcpy(kv.key, "mq-tmp");
  kv.keyLen = strlen("mq-tmp") + 1;
  SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
  if (pMqHb == NULL) {
    return pArray;
L
Liu Jicong 已提交
324
  }
L
Liu Jicong 已提交
325 326 327 328 329 330 331 332 333 334 335 336
  pMqHb->consumerId = connKey.connId;
  SArray* clientTopics = pTmq->clientTopics;
  int sz = taosArrayGetSize(clientTopics);
  for (int i = 0; i < sz; i++) {
    SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
    if (pCTopic->vgId == -1) {
      pMqHb->status = 1;
      break;
    }
  }
  kv.value = pMqHb;
  kv.valueLen = sizeof(SMqHbMsg);
L
Liu Jicong 已提交
337
  taosArrayPush(pArray, &kv);
L
Liu Jicong 已提交
338 339

  return pArray;
L
Liu Jicong 已提交
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

L
Liu Jicong 已提交
355
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
356 357 358 359
  STscObj     *pTscObj = (STscObj*)taos;
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQueryNode = NULL;
  char        *pStr = NULL;
L
Liu Jicong 已提交
360 361

  terrno = TSDB_CODE_SUCCESS;
362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create topic, %s", topicName);
L
Liu Jicong 已提交
381 382

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
383
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
L
Liu Jicong 已提交
384

H
Haojun Liao 已提交
385 386 387
  SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
  pQueryStmtInfo->info.continueQuery = true;

388
  // todo check for invalid sql statement and return with error code
L
Liu Jicong 已提交
389

390 391 392
  SSchema *schema = NULL;
  int32_t  numOfCols = 0;
  CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return);
L
Liu Jicong 已提交
393

394 395 396
  pStr = qDagToString(pRequest->body.pDag);
  if(pStr == NULL) {
    goto _return;
L
Liu Jicong 已提交
397
  }
398

H
Haojun Liao 已提交
399 400
  printf("%s\n", pStr);

401 402 403 404 405 406 407 408 409 410
  // The topic should be related to a database that the queried table is belonged to.
  SName name = {0};
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);

  tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
  tNameFromString(&name, topicName, T_NAME_TABLE);

  char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
  tNameExtractFullName(&name, topicFname);
L
Liu Jicong 已提交
411 412

  SCMCreateTopicReq req = {
413 414 415 416 417
    .name         = (char*) topicFname,
    .igExists     = 0,
    .physicalPlan = (char*) pStr,
    .sql          = (char*) sql,
    .logicalPlan  = "no logic plan",
L
Liu Jicong 已提交
418 419
  };

L
Liu Jicong 已提交
420 421 422 423 424
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
425

L
Liu Jicong 已提交
426 427 428
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
429 430

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
431
  pRequest->type = TDMT_MND_CREATE_TOPIC;
L
Liu Jicong 已提交
432

433
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
434
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
L
Liu Jicong 已提交
435 436

  int64_t transporterId = 0;
437
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
L
Liu Jicong 已提交
438 439 440 441

  tsem_wait(&pRequest->body.rspSem);

_return:
442 443 444 445 446
  qDestroyQuery(pQueryNode);
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

L
Liu Jicong 已提交
447 448 449
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
450

L
Liu Jicong 已提交
451 452 453
  return pRequest;
}

L
Liu Jicong 已提交
454
typedef struct tmq_message_t {
H
Haojun Liao 已提交
455
  int32_t  numOfRows;
L
Liu Jicong 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468
  char*    topicName;
  TAOS_ROW row[];
} tmq_message_t;

tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
  return NULL;
}

tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
  return NULL;
}

void tmq_message_destroy(tmq_message_t* mq_message) {
H
Haojun Liao 已提交
469

L
Liu Jicong 已提交
470 471 472
}


X
Xiaoyu Wang 已提交
473 474 475 476 477 478
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
479 480
  }

X
Xiaoyu Wang 已提交
481 482
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
483
  SRequestObj *pRequest = NULL;
484
  SQueryNode  *pQueryNode = NULL;
485
  SArray      *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
486

X
Xiaoyu Wang 已提交
487
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
488
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
489
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
490

491 492
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
493
  } else {
494 495
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
496
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
497 498 499
  }

_return:
500
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
501
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
502 503
    pRequest->code = terrno;
  }
504

505 506 507
  return pRequest;
}

508
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
509 510
  pEpSet->version = 0;

H
Haojun Liao 已提交
511
  // init mnode ip set
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

544
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
545
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
546 547 548 549 550
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
551
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
552 553 554
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
555 556 557
    return NULL;
  }

558
  SMsgSendInfo* body = buildConnectMsg(pRequest);
559 560

  int64_t transporterId = 0;
H
Haojun Liao 已提交
561
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
562 563 564 565 566 567 568 569 570 571

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
572
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
573 574 575 576 577 578
    destroyRequest(pRequest);
  }

  return pTscObj;
}

579 580 581 582 583 584 585
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
586
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
587
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
588 589
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
590
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
591
  pMsgSendInfo->param           = pRequest;
592

S
Shengliang Guan 已提交
593
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
594
  if (pConnect == NULL) {
595
    tfree(pMsgSendInfo);
596
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
597
    return NULL;
598 599
  }

600 601
  STscObj *pObj = pRequest->pTscObj;

602
  char* db = getConnectionDB(pObj);
603 604 605
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
606
  tfree(db);
607

608 609 610
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
611

612 613
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
614 615
}

616
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
617
  assert(pMsgBody != NULL);
618 619
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
620 621
}

622
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
623 624
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
625

626
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
627
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
628
    assert(pRequest->self == pSendInfo->requestObjRefId);
629

630 631
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
632

633 634 635 636 637
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
638 639
    }

640
    /*
641 642
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
643
     */
644
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
645
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
646
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
647
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
648
    } else {
H
Haojun Liao 已提交
649
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
650
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
651
    }
652

H
Haojun Liao 已提交
653
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
654 655
  }

656 657 658 659 660 661 662 663 664 665
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
666 667
  }

668
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
669
  rpcFreeCont(pMsg->pCont);
670
  destroySendMsgInfo(pSendInfo);
671
}
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
698 699 700 701
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
702
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
703

H
Haojun Liao 已提交
704 705
  SEpSet epSet = {0};

H
Haojun Liao 已提交
706
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
707
    if (pRequest->type == TDMT_VND_QUERY) {
708 709 710 711 712
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

713 714
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
      int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
715 716 717 718
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
719

720 721 722 723
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
      tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows,
               pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

724
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
725 726 727 728
        return NULL;
      }

      goto _return;
729
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
730
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
731
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
732 733
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
734 735 736 737 738 739 740 741 742 743 744
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);

      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

H
Haojun Liao 已提交
762 763 764 765 766 767 768 769
      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
770 771
      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
772
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
773 774 775
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
776 777
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
      return NULL;
H
Haojun Liao 已提交
778
    }
H
Haojun Liao 已提交
779

780
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
781

782 783
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
784
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
785 786 787 788 789 790 791 792 793

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
794 795
_return:

H
Haojun Liao 已提交
796 797 798 799 800
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
801
    }
H
Haojun Liao 已提交
802 803 804 805 806 807
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
808 809 810 811 812 813 814 815
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

816
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
817 818 819 820 821
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
822 823 824
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
825 826 827
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
828
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
829 830
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
831
  }
S
Shengliang Guan 已提交
832 833
}

834 835 836
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
837 838 839 840
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
841

842
  pthread_mutex_unlock(&pObj->mutex);
843 844 845 846 847 848 849 850 851
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
852

853
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
854 855 856
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
857 858
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
859 860
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
861

862
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
863
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
864
}