clientImpl.c 26.0 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15

16
#define CHECK_CODE_GOTO(expr, label) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20
      terrno = code;                 \
21
      goto label;                    \
H
Haojun Liao 已提交
22
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

H
Haojun Liao 已提交
30
  static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

61
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void  setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

74
  char localDb[TSDB_DB_NAME_LEN] = {0};
75 76 77 78 79 80
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

81 82
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
83 84
  }

85
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

114
  // TODO: race condition here.
H
Haojun Liao 已提交
115
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
116
  if (pInst == NULL) {
H
Haojun Liao 已提交
117 118
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
120
    p->pAppHbMgr = appHbMgrInit(p);
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

H
Haojun Liao 已提交
126
  tfree(key);
127
  return taosConnectImpl(user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst);
128 129
}

X
Xiaoyu Wang 已提交
130 131 132
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
133
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
134
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
135 136
  }

X
Xiaoyu Wang 已提交
137 138 139 140 141
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
142 143
  }

X
Xiaoyu Wang 已提交
144 145 146
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
147

H
Haojun Liao 已提交
148
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
149 150
  return TSDB_CODE_SUCCESS;
}
151

X
Xiaoyu Wang 已提交
152
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
153 154
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
155
  SParseContext cxt = {
H
Haojun Liao 已提交
156
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
157 158 159 160 161 162
    .acctId    = pTscObj->acctId,
    .db        = getConnectionDB(pTscObj),
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
163
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
164
  };
H
Haojun Liao 已提交
165

H
Haojun Liao 已提交
166 167
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
168
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
169
    tfree(cxt.db);
H
Haojun Liao 已提交
170 171 172 173
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
174

H
Haojun Liao 已提交
175
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
176 177
  return code;
}
H
Haojun Liao 已提交
178

X
Xiaoyu Wang 已提交
179 180 181
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
182
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
183

H
Haojun Liao 已提交
184
  STscObj* pTscObj = pRequest->pTscObj;
185
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
186

187
  int64_t transporterId = 0;
188 189 190 191
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
192
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
193 194 195
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
196
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
197
  } else {
H
Haojun Liao 已提交
198
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
199 200
  }

X
Xiaoyu Wang 已提交
201 202 203
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
204

205 206
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
  pRequest->type = pQueryNode->type;
207 208

  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
209
  int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
210 211 212 213 214
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
215 216 217 218 219
    SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);

    SSubplan* pPlan = taosArrayGetP(pa, 0);
    SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
    setResSchemaInfo(pResInfo, pDataBlockSchema);
H
Haojun Liao 已提交
220

221
    pRequest->type = TDMT_VND_QUERY;
222 223 224
  }

  return code;
X
Xiaoyu Wang 已提交
225 226
}

227 228 229 230 231 232 233 234 235 236
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
  assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);

  pResInfo->numOfCols = pDataBlockSchema->numOfCols;
  pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
    SSchema* pSchema = &pDataBlockSchema->pSchema[i];
    pResInfo->fields[i].bytes = pSchema->bytes;
    pResInfo->fields[i].type  = pSchema->type;
237
    tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
238
  }
X
Xiaoyu Wang 已提交
239 240
}

241
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
242
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
243
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
244

H
Haojun Liao 已提交
245
    int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
246 247 248
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
249 250
      if (pRequest->body.pQueryJob != NULL) {
        scheduleFreeJob(pRequest->body.pQueryJob);
251 252 253
      }
    }

H
Haojun Liao 已提交
254 255 256
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
257
  }
258

259 260
  SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr));

H
Haojun Liao 已提交
261
  SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2};
262
  addr.epAddr[0].port = 6030;
H
Haojun Liao 已提交
263
  strcpy(addr.epAddr[0].fqdn, "localhost");
264 265 266

  taosArrayPush(execNode, &addr);
  return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
267
}
X
Xiaoyu Wang 已提交
268

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
typedef struct tmq_t tmq_t;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

typedef struct tmq_topic_vgroup_list_t {
  char* topicName;
  int32_t vgId;
  int64_t committedOffset;
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

typedef struct tmq_conf_t{
  char*          clientId;
  char*          groupId;
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
} tmq_conf_t;

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  SArray*        clientTopics;  // SArray<SMqClientTopic>
};

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
L
Liu Jicong 已提交
331 332 333 334 335
  strcpy(kv.key, "mq-tmp");
  kv.keyLen = strlen("mq-tmp") + 1;
  SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
  if (pMqHb == NULL) {
    return pArray;
L
Liu Jicong 已提交
336
  }
L
Liu Jicong 已提交
337 338 339 340 341 342 343 344 345 346 347 348
  pMqHb->consumerId = connKey.connId;
  SArray* clientTopics = pTmq->clientTopics;
  int sz = taosArrayGetSize(clientTopics);
  for (int i = 0; i < sz; i++) {
    SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
    if (pCTopic->vgId == -1) {
      pMqHb->status = 1;
      break;
    }
  }
  kv.value = pMqHb;
  kv.valueLen = sizeof(SMqHbMsg);
L
Liu Jicong 已提交
349
  taosArrayPush(pArray, &kv);
L
Liu Jicong 已提交
350 351

  return pArray;
L
Liu Jicong 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

L
Liu Jicong 已提交
367
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
368 369 370 371
  STscObj     *pTscObj = (STscObj*)taos;
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQueryNode = NULL;
  char        *pStr = NULL;
L
Liu Jicong 已提交
372 373

  terrno = TSDB_CODE_SUCCESS;
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create topic, %s", topicName);
L
Liu Jicong 已提交
393 394

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
395
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
L
Liu Jicong 已提交
396

397
  // todo check for invalid sql statement and return with error code
L
Liu Jicong 已提交
398

399
  CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
L
Liu Jicong 已提交
400

401 402 403
  pStr = qDagToString(pRequest->body.pDag);
  if(pStr == NULL) {
    goto _return;
L
Liu Jicong 已提交
404
  }
405 406 407 408 409 410 411 412 413 414 415

  // The topic should be related to a database that the queried table is belonged to.
  SName name = {0};
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);

  tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
  tNameFromString(&name, topicName, T_NAME_TABLE);

  char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
  tNameExtractFullName(&name, topicFname);
L
Liu Jicong 已提交
416 417

  SCMCreateTopicReq req = {
418 419 420 421 422
    .name         = (char*) topicFname,
    .igExists     = 0,
    .physicalPlan = (char*) pStr,
    .sql          = (char*) sql,
    .logicalPlan  = "no logic plan",
L
Liu Jicong 已提交
423 424
  };

L
Liu Jicong 已提交
425 426 427 428 429
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
430

L
Liu Jicong 已提交
431 432 433
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
434 435

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
436
  pRequest->type = TDMT_MND_CREATE_TOPIC;
L
Liu Jicong 已提交
437

438
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
439
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
L
Liu Jicong 已提交
440 441

  int64_t transporterId = 0;
442
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
L
Liu Jicong 已提交
443 444 445 446

  tsem_wait(&pRequest->body.rspSem);

_return:
447 448 449 450 451
  qDestroyQuery(pQueryNode);
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

L
Liu Jicong 已提交
452 453 454
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
455

L
Liu Jicong 已提交
456 457 458
  return pRequest;
}

L
Liu Jicong 已提交
459
typedef struct tmq_message_t {
H
Haojun Liao 已提交
460
  int32_t  numOfRows;
L
Liu Jicong 已提交
461 462 463 464 465 466 467 468 469 470 471 472 473
  char*    topicName;
  TAOS_ROW row[];
} tmq_message_t;

tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
  return NULL;
}

tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
  return NULL;
}

void tmq_message_destroy(tmq_message_t* mq_message) {
H
Haojun Liao 已提交
474

L
Liu Jicong 已提交
475 476 477
}


X
Xiaoyu Wang 已提交
478 479 480 481 482 483
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
484 485
  }

X
Xiaoyu Wang 已提交
486 487
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
488
  SRequestObj *pRequest = NULL;
489
  SQueryNode  *pQueryNode = NULL;
X
Xiaoyu Wang 已提交
490

X
Xiaoyu Wang 已提交
491
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
492
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
493
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
494

495 496
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
497
  } else {
498
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return);
H
Haojun Liao 已提交
499
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return);
X
Xiaoyu Wang 已提交
500
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
501 502 503
  }

_return:
504
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
505
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
506 507
    pRequest->code = terrno;
  }
508

509 510 511
  return pRequest;
}

512
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
513 514
  pEpSet->version = 0;

H
Haojun Liao 已提交
515
  // init mnode ip set
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

548
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
549
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
550 551 552 553 554
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
555
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
556 557 558
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
559 560 561
    return NULL;
  }

562
  SMsgSendInfo* body = buildConnectMsg(pRequest);
563 564

  int64_t transporterId = 0;
H
Haojun Liao 已提交
565
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
566 567 568 569 570 571 572 573 574 575

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
576
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
577 578 579 580 581 582
    destroyRequest(pRequest);
  }

  return pTscObj;
}

583 584 585 586 587 588 589
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
590
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
591
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
592 593
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
594
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
595
  pMsgSendInfo->param           = pRequest;
596

S
Shengliang Guan 已提交
597
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
598
  if (pConnect == NULL) {
599
    tfree(pMsgSendInfo);
600
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
601
    return NULL;
602 603
  }

604 605
  STscObj *pObj = pRequest->pTscObj;

606
  char* db = getConnectionDB(pObj);
607 608 609
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
610
  tfree(db);
611

612 613 614
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
615

616 617
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
618 619
}

620
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
621
  assert(pMsgBody != NULL);
622 623
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
624 625
}

626
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
627 628
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
629

630
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
631
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
632
    assert(pRequest->self == pSendInfo->requestObjRefId);
633

634 635
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
636

637 638 639 640 641
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
642 643
    }

644
    /*
645 646
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
647
     */
648
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
649
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
650
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
651
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
652
    } else {
H
Haojun Liao 已提交
653
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
654
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
655
    }
656

H
Haojun Liao 已提交
657
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
658 659
  }

660 661 662 663 664 665 666 667 668 669
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
670 671
  }

672
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
673
  rpcFreeCont(pMsg->pCont);
674
  destroySendMsgInfo(pSendInfo);
675
}
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
702 703 704 705
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
706
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
707

H
Haojun Liao 已提交
708 709
  SEpSet epSet = {0};

H
Haojun Liao 已提交
710
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
711
    if (pRequest->type == TDMT_VND_QUERY) {
712 713 714 715 716
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

717 718 719 720 721
      int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
722

723
      setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
724
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
725 726 727 728
        return NULL;
      }

      goto _return;
729
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
730 731 732
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
733 734 735 736 737 738 739 740 741 742 743
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);

      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

H
Haojun Liao 已提交
761 762 763 764 765 766 767 768
      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
769 770
      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
771
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
772 773 774
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
775
    }
H
Haojun Liao 已提交
776

777
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
778

779 780
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
781
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
782 783 784 785 786 787 788 789 790

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
791 792
_return:

H
Haojun Liao 已提交
793 794 795 796 797
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
798
    }
H
Haojun Liao 已提交
799 800 801 802 803 804
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
805 806 807 808 809 810 811 812
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

813
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
814 815 816 817 818
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
819 820 821
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
822 823 824
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
825
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
826 827
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
828
  }
S
Shengliang Guan 已提交
829 830
}

831 832 833
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
834 835 836 837
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
838

839
  pthread_mutex_unlock(&pObj->mutex);
840 841 842 843 844 845 846 847 848
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
849

850 851 852 853
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
854 855
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
856 857
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
858 859

  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
860
}