clientImpl.c 24.9 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15

16
#define CHECK_CODE_GOTO(expr, label) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20
      terrno = code;                 \
21
      goto label;                    \
H
Haojun Liao 已提交
22
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

H
Haojun Liao 已提交
30
  static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

61
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void  setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

74
  char localDb[TSDB_DB_NAME_LEN] = {0};
75 76 77 78 79 80
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

81 82
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
83 84
  }

85
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

114
  // TODO: race condition here.
H
Haojun Liao 已提交
115
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
116
  if (pInst == NULL) {
H
Haojun Liao 已提交
117 118
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
120
    p->pAppHbMgr = appHbMgrInit(p);
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

H
Haojun Liao 已提交
126
  tfree(key);
127
  return taosConnectImpl(user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst);
128 129
}

X
Xiaoyu Wang 已提交
130 131 132
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
133
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
134
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
135 136
  }

X
Xiaoyu Wang 已提交
137 138 139 140 141
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
142 143
  }

X
Xiaoyu Wang 已提交
144 145 146
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
147

H
Haojun Liao 已提交
148
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
149 150
  return TSDB_CODE_SUCCESS;
}
151

X
Xiaoyu Wang 已提交
152
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
153 154
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
155
  SParseContext cxt = {
H
Haojun Liao 已提交
156
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
157 158 159 160 161 162
    .acctId    = pTscObj->acctId,
    .db        = getConnectionDB(pTscObj),
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
163
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
164
  };
H
Haojun Liao 已提交
165

H
Haojun Liao 已提交
166 167
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
168
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
169
    tfree(cxt.db);
H
Haojun Liao 已提交
170 171 172 173
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
174

H
Haojun Liao 已提交
175
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
176 177
  return code;
}
H
Haojun Liao 已提交
178

X
Xiaoyu Wang 已提交
179 180 181
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
182
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
183

H
Haojun Liao 已提交
184
  STscObj* pTscObj = pRequest->pTscObj;
185
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
186

187
  int64_t transporterId = 0;
188 189 190 191
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
192
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
193 194 195
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
196
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
197
  } else {
198
    SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
H
Haojun Liao 已提交
199
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg);
200 201
  }

X
Xiaoyu Wang 已提交
202 203 204
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
205

206 207
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
  pRequest->type = pQueryNode->type;
208 209

  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
210
  int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
211 212 213 214 215
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
216 217 218 219 220
    SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);

    SSubplan* pPlan = taosArrayGetP(pa, 0);
    SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
    setResSchemaInfo(pResInfo, pDataBlockSchema);
H
Haojun Liao 已提交
221

222
    pRequest->type = TDMT_VND_QUERY;
223 224 225
  }

  return code;
X
Xiaoyu Wang 已提交
226 227
}

228 229 230 231 232 233 234 235 236 237
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
  assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);

  pResInfo->numOfCols = pDataBlockSchema->numOfCols;
  pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
    SSchema* pSchema = &pDataBlockSchema->pSchema[i];
    pResInfo->fields[i].bytes = pSchema->bytes;
    pResInfo->fields[i].type  = pSchema->type;
238
    tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
239 240 241 242
  }
}

int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
243
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
244
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
245

H
Haojun Liao 已提交
246
    int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
247 248 249
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
250 251
      if (pRequest->body.pQueryJob != NULL) {
        scheduleFreeJob(pRequest->body.pQueryJob);
252 253 254
      }
    }

H
Haojun Liao 已提交
255 256 257
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
258
  }
259

H
Haojun Liao 已提交
260
  return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
261
}
X
Xiaoyu Wang 已提交
262

L
Liu Jicong 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
typedef struct tmq_t tmq_t;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

typedef struct tmq_topic_vgroup_list_t {
  char* topicName;
  int32_t vgId;
  int64_t committedOffset;
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

typedef struct tmq_conf_t{
  char*          clientId;
  char*          groupId;
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
} tmq_conf_t;

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  SArray*        clientTopics;  // SArray<SMqClientTopic>
};

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
  strcpy(kv.key, "groupId");
  kv.keyLen = strlen("groupId") + 1;
  kv.value = malloc(256);
  if (kv.value == NULL) {
    free(kv.key);
    taosArrayDestroy(pArray);
    return NULL;
  }
  strcpy(kv.value, pTmq->groupId);
  kv.valueLen = strlen(pTmq->groupId) + 1;

  taosArrayPush(pArray, &kv);
  strcpy(kv.key, "clientUid");
  kv.keyLen = strlen("clientUid") + 1;
  *(uint32_t*)kv.value = pTmq->pTscObj->connId;
H
Haojun Liao 已提交
340 341
  kv.valueLen = sizeof(uint32_t);

L
Liu Jicong 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
  return NULL;
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

358 359 360 361 362
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
  STscObj     *pTscObj = (STscObj*)taos;
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQueryNode = NULL;
  char        *pStr = NULL;
L
Liu Jicong 已提交
363 364

  terrno = TSDB_CODE_SUCCESS;
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create topic, %s", topicName);
L
Liu Jicong 已提交
384 385

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
386
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
L
Liu Jicong 已提交
387

388
  // todo check for invalid sql statement and return with error code
L
Liu Jicong 已提交
389

390
  CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
L
Liu Jicong 已提交
391

392 393 394
  pStr = qDagToString(pRequest->body.pDag);
  if(pStr == NULL) {
    goto _return;
L
Liu Jicong 已提交
395
  }
396 397 398 399 400 401 402 403 404 405 406

  // The topic should be related to a database that the queried table is belonged to.
  SName name = {0};
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);

  tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
  tNameFromString(&name, topicName, T_NAME_TABLE);

  char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
  tNameExtractFullName(&name, topicFname);
L
Liu Jicong 已提交
407 408

  SCMCreateTopicReq req = {
409 410 411 412 413
    .name         = (char*) topicFname,
    .igExists     = 0,
    .physicalPlan = (char*) pStr,
    .sql          = (char*) sql,
    .logicalPlan  = "no logic plan",
L
Liu Jicong 已提交
414 415
  };

L
Liu Jicong 已提交
416 417 418 419 420
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
421

L
Liu Jicong 已提交
422 423 424
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
425 426

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
427
  pRequest->type = TDMT_MND_CREATE_TOPIC;
L
Liu Jicong 已提交
428

429
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
430
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
L
Liu Jicong 已提交
431 432

  int64_t transporterId = 0;
433
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
L
Liu Jicong 已提交
434 435 436 437

  tsem_wait(&pRequest->body.rspSem);

_return:
438 439 440 441 442
  qDestroyQuery(pQueryNode);
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

L
Liu Jicong 已提交
443 444 445
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
446

L
Liu Jicong 已提交
447 448 449
  return pRequest;
}

L
Liu Jicong 已提交
450
typedef struct tmq_message_t {
H
Haojun Liao 已提交
451
  int32_t  numOfRows;
L
Liu Jicong 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464
  char*    topicName;
  TAOS_ROW row[];
} tmq_message_t;

tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
  return NULL;
}

tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
  return NULL;
}

void tmq_message_destroy(tmq_message_t* mq_message) {
H
Haojun Liao 已提交
465

L
Liu Jicong 已提交
466 467 468
}


X
Xiaoyu Wang 已提交
469 470 471 472 473 474
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
475 476
  }

X
Xiaoyu Wang 已提交
477 478
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
479
  SRequestObj *pRequest = NULL;
480
  SQueryNode  *pQueryNode = NULL;
X
Xiaoyu Wang 已提交
481

X
Xiaoyu Wang 已提交
482
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
483
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
484
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
485

486 487
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
488
  } else {
489
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return);
H
Haojun Liao 已提交
490
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return);
X
Xiaoyu Wang 已提交
491
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
492 493 494
  }

_return:
495
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
496
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
497 498
    pRequest->code = terrno;
  }
499

500 501 502
  return pRequest;
}

503
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
504 505
  pEpSet->version = 0;

H
Haojun Liao 已提交
506
  // init mnode ip set
507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

539
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
540
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
541 542 543 544 545
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
546
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
547 548 549
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
550 551 552
    return NULL;
  }

553
  SMsgSendInfo* body = buildConnectMsg(pRequest);
554 555

  int64_t transporterId = 0;
H
Haojun Liao 已提交
556
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
557 558 559 560 561 562 563 564 565 566

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
567
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
568 569 570 571 572 573
    destroyRequest(pRequest);
  }

  return pTscObj;
}

574 575 576 577 578 579 580
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
581
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
582
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
583 584
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
585
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
586
  pMsgSendInfo->param           = pRequest;
587

S
Shengliang Guan 已提交
588
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
589
  if (pConnect == NULL) {
590
    tfree(pMsgSendInfo);
591
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
592
    return NULL;
593 594
  }

595 596
  STscObj *pObj = pRequest->pTscObj;

597
  char* db = getConnectionDB(pObj);
598 599 600
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
601
  tfree(db);
602

603 604 605
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
606

607 608
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
609 610
}

611
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
612
  assert(pMsgBody != NULL);
613 614
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
615 616
}

617
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
618 619
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
620

621
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
622
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
623
    assert(pRequest->self == pSendInfo->requestObjRefId);
624

625 626
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
627

628 629 630 631 632
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
633 634
    }

635
    /*
636 637
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
638
     */
639
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
640
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
641
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
642
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
643
    } else {
H
Haojun Liao 已提交
644
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
645
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
646
    }
647

H
Haojun Liao 已提交
648
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
649 650
  }

651 652 653 654 655 656 657 658 659 660
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
661 662
  }

663
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
664
  rpcFreeCont(pMsg->pCont);
665
  destroySendMsgInfo(pSendInfo);
666
}
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
693 694 695 696
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
697
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
698

H
Haojun Liao 已提交
699
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
700
    if (pRequest->type == TDMT_VND_QUERY) {
701 702 703 704 705
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

D
dapan1121 已提交
706
      scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
H
Haojun Liao 已提交
707 708
      setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);

709
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
710 711 712 713
        return NULL;
      }

      goto _return;
714
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
715 716 717
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
737
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
H
Haojun Liao 已提交
738 739 740
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
741
    }
H
Haojun Liao 已提交
742

743
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
744

745 746
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
747
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
H
Haojun Liao 已提交
748 749 750 751 752 753 754 755 756

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
757 758
_return:

H
Haojun Liao 已提交
759 760 761 762 763
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
764
    }
H
Haojun Liao 已提交
765 766 767 768 769 770
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
771 772 773 774 775 776 777 778
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

779
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
780 781 782 783 784
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
785 786 787
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
788 789 790
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
791
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
792 793
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
794
  }
S
Shengliang Guan 已提交
795 796
}

797 798 799
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
800 801 802 803
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
804

805
  pthread_mutex_unlock(&pObj->mutex);
806 807 808 809 810 811 812 813 814
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
815

816 817 818 819
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
820 821
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
822 823
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
824 825

  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
826
}