clientImpl.c 26.4 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15

16
#define CHECK_CODE_GOTO(expr, label) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20
      terrno = code;                 \
21
      goto label;                    \
H
Haojun Liao 已提交
22
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

H
Haojun Liao 已提交
30
  static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

61
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void  setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

74
  char localDb[TSDB_DB_NAME_LEN] = {0};
75 76 77 78 79 80
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

81 82
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
83 84
  }

85
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

114
  // TODO: race condition here.
H
Haojun Liao 已提交
115
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
116
  if (pInst == NULL) {
H
Haojun Liao 已提交
117 118
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
120
    p->pAppHbMgr = appHbMgrInit(p);
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

H
Haojun Liao 已提交
126
  tfree(key);
127
  return taosConnectImpl(user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst);
128 129
}

X
Xiaoyu Wang 已提交
130 131 132
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
133
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
134
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
135 136
  }

X
Xiaoyu Wang 已提交
137 138 139 140 141
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
142 143
  }

X
Xiaoyu Wang 已提交
144 145 146
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
147

H
Haojun Liao 已提交
148
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
149 150
  return TSDB_CODE_SUCCESS;
}
151

X
Xiaoyu Wang 已提交
152
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
153 154
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
155
  SParseContext cxt = {
H
Haojun Liao 已提交
156
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
157 158 159 160 161 162
    .acctId    = pTscObj->acctId,
    .db        = getConnectionDB(pTscObj),
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
163
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
164
  };
H
Haojun Liao 已提交
165

H
Haojun Liao 已提交
166 167
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
168
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
169
    tfree(cxt.db);
H
Haojun Liao 已提交
170 171 172 173
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
174

H
Haojun Liao 已提交
175
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
176 177
  return code;
}
H
Haojun Liao 已提交
178

X
Xiaoyu Wang 已提交
179 180 181
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
182
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
183

H
Haojun Liao 已提交
184
  STscObj* pTscObj = pRequest->pTscObj;
185
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
186

187
  int64_t transporterId = 0;
188 189 190 191
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
192
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
193 194 195
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
196
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
197
  } else {
H
Haojun Liao 已提交
198
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
199 200
  }

X
Xiaoyu Wang 已提交
201 202 203
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
204

205 206
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
  pRequest->type = pQueryNode->type;
207 208

  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
209
  int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
210 211 212 213 214
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
215 216 217 218 219
    SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);

    SSubplan* pPlan = taosArrayGetP(pa, 0);
    SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
    setResSchemaInfo(pResInfo, pDataBlockSchema);
H
Haojun Liao 已提交
220

221
    pRequest->type = TDMT_VND_QUERY;
222 223 224
  }

  return code;
X
Xiaoyu Wang 已提交
225 226
}

227 228 229 230 231 232 233 234 235 236
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
  assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);

  pResInfo->numOfCols = pDataBlockSchema->numOfCols;
  pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
    SSchema* pSchema = &pDataBlockSchema->pSchema[i];
    pResInfo->fields[i].bytes = pSchema->bytes;
    pResInfo->fields[i].type  = pSchema->type;
237
    tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
238
  }
X
Xiaoyu Wang 已提交
239 240
}

241
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
242
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
243
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
244

H
Haojun Liao 已提交
245
    int32_t code = scheduleExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
246 247 248
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
249 250
      if (pRequest->body.pQueryJob != NULL) {
        scheduleFreeJob(pRequest->body.pQueryJob);
251 252 253
      }
    }

H
Haojun Liao 已提交
254 255 256
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
257
  }
258

259 260
  SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr));

H
Haojun Liao 已提交
261
  SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2};
H
Haojun Liao 已提交
262
  addr.epAddr[0].port = 7100;
H
Haojun Liao 已提交
263
  strcpy(addr.epAddr[0].fqdn, "localhost");
264 265 266

  taosArrayPush(execNode, &addr);
  return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
267
}
X
Xiaoyu Wang 已提交
268

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
typedef struct tmq_t tmq_t;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

typedef struct tmq_topic_vgroup_list_t {
  char* topicName;
  int32_t vgId;
  int64_t committedOffset;
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

typedef struct tmq_conf_t{
  char*          clientId;
  char*          groupId;
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
} tmq_conf_t;

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  SArray*        clientTopics;  // SArray<SMqClientTopic>
};

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
L
Liu Jicong 已提交
331 332 333 334 335
  strcpy(kv.key, "mq-tmp");
  kv.keyLen = strlen("mq-tmp") + 1;
  SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
  if (pMqHb == NULL) {
    return pArray;
L
Liu Jicong 已提交
336
  }
L
Liu Jicong 已提交
337 338 339 340 341 342 343 344 345 346 347 348
  pMqHb->consumerId = connKey.connId;
  SArray* clientTopics = pTmq->clientTopics;
  int sz = taosArrayGetSize(clientTopics);
  for (int i = 0; i < sz; i++) {
    SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
    if (pCTopic->vgId == -1) {
      pMqHb->status = 1;
      break;
    }
  }
  kv.value = pMqHb;
  kv.valueLen = sizeof(SMqHbMsg);
L
Liu Jicong 已提交
349
  taosArrayPush(pArray, &kv);
L
Liu Jicong 已提交
350 351

  return pArray;
L
Liu Jicong 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

L
Liu Jicong 已提交
367
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
368 369 370 371
  STscObj     *pTscObj = (STscObj*)taos;
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQueryNode = NULL;
  char        *pStr = NULL;
L
Liu Jicong 已提交
372 373

  terrno = TSDB_CODE_SUCCESS;
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create topic, %s", topicName);
L
Liu Jicong 已提交
393 394

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
395
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
L
Liu Jicong 已提交
396

H
Haojun Liao 已提交
397 398 399
  SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
  pQueryStmtInfo->info.continueQuery = true;

400
  // todo check for invalid sql statement and return with error code
L
Liu Jicong 已提交
401

402
  CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
L
Liu Jicong 已提交
403

404 405 406
  pStr = qDagToString(pRequest->body.pDag);
  if(pStr == NULL) {
    goto _return;
L
Liu Jicong 已提交
407
  }
408

H
Haojun Liao 已提交
409 410
  printf("%s\n", pStr);

411 412 413 414 415 416 417 418 419 420
  // The topic should be related to a database that the queried table is belonged to.
  SName name = {0};
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);

  tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
  tNameFromString(&name, topicName, T_NAME_TABLE);

  char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
  tNameExtractFullName(&name, topicFname);
L
Liu Jicong 已提交
421 422

  SCMCreateTopicReq req = {
423 424 425 426 427
    .name         = (char*) topicFname,
    .igExists     = 0,
    .physicalPlan = (char*) pStr,
    .sql          = (char*) sql,
    .logicalPlan  = "no logic plan",
L
Liu Jicong 已提交
428 429
  };

L
Liu Jicong 已提交
430 431 432 433 434
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
435

L
Liu Jicong 已提交
436 437 438
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
439 440

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
441
  pRequest->type = TDMT_MND_CREATE_TOPIC;
L
Liu Jicong 已提交
442

443
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
444
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
L
Liu Jicong 已提交
445 446

  int64_t transporterId = 0;
447
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
L
Liu Jicong 已提交
448 449 450 451

  tsem_wait(&pRequest->body.rspSem);

_return:
452 453 454 455 456
  qDestroyQuery(pQueryNode);
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

L
Liu Jicong 已提交
457 458 459
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
460

L
Liu Jicong 已提交
461 462 463
  return pRequest;
}

L
Liu Jicong 已提交
464
typedef struct tmq_message_t {
H
Haojun Liao 已提交
465
  int32_t  numOfRows;
L
Liu Jicong 已提交
466 467 468 469 470 471 472 473 474 475 476 477 478
  char*    topicName;
  TAOS_ROW row[];
} tmq_message_t;

tmq_message_t* tmq_consume_poll(tmq_t* mq, int64_t blocking_time) {
  return NULL;
}

tmq_resp_err_t* tmq_commit(tmq_t* mq, void* callback, int32_t async) {
  return NULL;
}

void tmq_message_destroy(tmq_message_t* mq_message) {
H
Haojun Liao 已提交
479

L
Liu Jicong 已提交
480 481 482
}


X
Xiaoyu Wang 已提交
483 484 485 486 487 488
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
489 490
  }

X
Xiaoyu Wang 已提交
491 492
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
493
  SRequestObj *pRequest = NULL;
494
  SQueryNode  *pQueryNode = NULL;
X
Xiaoyu Wang 已提交
495

X
Xiaoyu Wang 已提交
496
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
497
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
498
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
499

500 501
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
502
  } else {
503
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag), _return);
H
Haojun Liao 已提交
504
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag), _return);
X
Xiaoyu Wang 已提交
505
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
506 507 508
  }

_return:
509
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
510
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
511 512
    pRequest->code = terrno;
  }
513

514 515 516
  return pRequest;
}

517
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
518 519
  pEpSet->version = 0;

H
Haojun Liao 已提交
520
  // init mnode ip set
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

553
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
554
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
555 556 557 558 559
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
560
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
561 562 563
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
564 565 566
    return NULL;
  }

567
  SMsgSendInfo* body = buildConnectMsg(pRequest);
568 569

  int64_t transporterId = 0;
H
Haojun Liao 已提交
570
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
571 572 573 574 575 576 577 578 579 580

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
581
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
582 583 584 585 586 587
    destroyRequest(pRequest);
  }

  return pTscObj;
}

588 589 590 591 592 593 594
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
595
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
596
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
597 598
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
599
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
600
  pMsgSendInfo->param           = pRequest;
601

S
Shengliang Guan 已提交
602
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
603
  if (pConnect == NULL) {
604
    tfree(pMsgSendInfo);
605
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
606
    return NULL;
607 608
  }

609 610
  STscObj *pObj = pRequest->pTscObj;

611
  char* db = getConnectionDB(pObj);
612 613 614
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
615
  tfree(db);
616

617 618 619
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
620

621 622
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
623 624
}

625
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
626
  assert(pMsgBody != NULL);
627 628
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
629 630
}

631
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
632 633
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
634

635
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
636
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
637
    assert(pRequest->self == pSendInfo->requestObjRefId);
638

639 640
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
641

642 643 644 645 646
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
647 648
    }

649
    /*
650 651
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
652
     */
653
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
654
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
655
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
656
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
657
    } else {
H
Haojun Liao 已提交
658
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
659
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
660
    }
661

H
Haojun Liao 已提交
662
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
663 664
  }

665 666 667 668
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
H
Haojun Liao 已提交
669 670
    printf("create------------>%p\n", buf.pData);

671 672 673 674 675 676
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
677 678
  }

679
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
680
  rpcFreeCont(pMsg->pCont);
681
  destroySendMsgInfo(pSendInfo);
682
}
683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
709 710 711 712
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
713
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
714

H
Haojun Liao 已提交
715 716
  SEpSet epSet = {0};

H
Haojun Liao 已提交
717
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
718
    if (pRequest->type == TDMT_VND_QUERY) {
719 720 721 722 723
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

724 725 726 727 728
      int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
729

730
      setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
731
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
732 733 734 735
        return NULL;
      }

      goto _return;
736
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
737
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
738
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
739 740
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
741 742 743 744 745 746 747 748 749 750 751
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);

      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

H
Haojun Liao 已提交
769 770 771 772 773 774 775 776
      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
777 778
      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
779
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
780 781 782
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
783 784
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
      return NULL;
H
Haojun Liao 已提交
785
    }
H
Haojun Liao 已提交
786

787
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
788

789 790
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
791
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
792 793 794 795 796 797 798 799 800

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
801 802
_return:

H
Haojun Liao 已提交
803 804 805 806 807
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
808
    }
H
Haojun Liao 已提交
809 810 811 812 813 814
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
815 816 817 818 819 820 821 822
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

823
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
824 825 826 827 828
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
829 830 831
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
832 833 834
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
835
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
836 837
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
838
  }
S
Shengliang Guan 已提交
839 840
}

841 842 843
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
844 845 846 847
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
848

849
  pthread_mutex_unlock(&pObj->mutex);
850 851 852 853 854 855 856 857 858
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
859

860 861 862 863
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
864 865
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
866 867
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
868 869

  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
870
}