clientImpl.c 30.7 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15

16
#define CHECK_CODE_GOTO(expr, label) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20
      terrno = code;                 \
21
      goto label;                    \
H
Haojun Liao 已提交
22
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

30
static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

61
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

74
  char localDb[TSDB_DB_NAME_LEN] = {0};
75 76 77 78 79 80
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

81 82
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
83 84
  }

85
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

114
  // TODO: race condition here.
H
Haojun Liao 已提交
115
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
116
  if (pInst == NULL) {
H
Haojun Liao 已提交
117 118
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
120
    p->pAppHbMgr = appHbMgrInit(p);
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

H
Haojun Liao 已提交
126
  tfree(key);
127
  return taosConnectImpl(user, &secretEncrypt[0], localDb, port, NULL, NULL, *pInst);
128 129
}

X
Xiaoyu Wang 已提交
130 131 132
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
133
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
134
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
135 136
  }

X
Xiaoyu Wang 已提交
137 138 139 140 141
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
142 143
  }

X
Xiaoyu Wang 已提交
144 145 146
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
147

H
Haojun Liao 已提交
148
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
149 150
  return TSDB_CODE_SUCCESS;
}
151

X
Xiaoyu Wang 已提交
152
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
153 154
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
155
  SParseContext cxt = {
H
Haojun Liao 已提交
156
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
157 158 159 160 161 162
    .acctId    = pTscObj->acctId,
    .db        = getConnectionDB(pTscObj),
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
163
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
164
  };
H
Haojun Liao 已提交
165

H
Haojun Liao 已提交
166 167
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
168
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
169
    tfree(cxt.db);
H
Haojun Liao 已提交
170 171 172 173
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
174

H
Haojun Liao 已提交
175
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
176 177
  return code;
}
H
Haojun Liao 已提交
178

X
Xiaoyu Wang 已提交
179 180 181
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
182
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
183

H
Haojun Liao 已提交
184
  STscObj* pTscObj = pRequest->pTscObj;
185
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
186

187
  int64_t transporterId = 0;
188 189 190 191
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
192
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
193 194 195
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
196
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
197
  } else {
H
Haojun Liao 已提交
198
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
199 200
  }

X
Xiaoyu Wang 已提交
201 202 203
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
204

205
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) {
206
  pRequest->type = pQueryNode->type;
207

208 209 210
  SSchema* pSchema = NULL;
  int32_t  numOfCols = 0;
  int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
211 212 213 214 215
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
216
    setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
217
    pRequest->type = TDMT_VND_QUERY;
D
dapan1121 已提交
218 219
  } else {
    tfree(pSchema);
220 221 222
  }

  return code;
X
Xiaoyu Wang 已提交
223 224
}

225 226
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
227

228 229
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
230 231

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
232 233 234
    pResInfo->fields[i].bytes = pSchema[i].bytes;
    pResInfo->fields[i].type  = pSchema[i].type;
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
235
  }
X
Xiaoyu Wang 已提交
236 237
}

238
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
239
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
240
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
241

D
dapan1121 已提交
242 243
    taosArrayDestroy(pNodeList);

D
dapan1121 已提交
244
    int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
245 246 247
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
248
      if (pRequest->body.pQueryJob != NULL) {
D
dapan1121 已提交
249
        schedulerFreeJob(pRequest->body.pQueryJob);
250 251 252
      }
    }

H
Haojun Liao 已提交
253 254 255
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
256
  }
257

258
  return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
259
}
X
Xiaoyu Wang 已提交
260

L
Liu Jicong 已提交
261

L
Liu Jicong 已提交
262
typedef struct SMqClientVg {
L
Liu Jicong 已提交
263 264 265 266 267 268 269 270
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
L
Liu Jicong 已提交
271 272 273 274 275 276 277 278 279 280
} SMqClientVg;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  int32_t nextVgIdx;
  SArray* vgs;    //SArray<SMqClientVg>
L
Liu Jicong 已提交
281 282 283 284 285 286
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

L
Liu Jicong 已提交
287 288
typedef struct tmq_topic_vgroup_t {
  char*   topic;
L
Liu Jicong 已提交
289
  int32_t vgId;
L
Liu Jicong 已提交
290 291 292 293 294 295 296
  int64_t commitOffset;
} tmq_topic_vgroup_t;

typedef struct tmq_topic_vgroup_list_t {
  int32_t cnt;
  int32_t size;
  tmq_topic_vgroup_t* elems;
L
Liu Jicong 已提交
297 298 299 300
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

L
Liu Jicong 已提交
301 302 303
struct tmq_conf_t {
  char           clientId[256];
  char           groupId[256];
L
Liu Jicong 已提交
304 305 306
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
};

tmq_conf_t* tmq_conf_new() {
  tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
  return conf;
}

int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
  if (strcmp(key, "group.id")) {
    strcpy(conf->groupId, value);
  }
  if (strcmp(key, "client.id")) {
    strcpy(conf->clientId, value);
  }
  return 0;
}
L
Liu Jicong 已提交
323 324 325 326

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
L
Liu Jicong 已提交
327 328
  int64_t        consumerId;
  int64_t        status;
L
Liu Jicong 已提交
329 330
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
  int32_t        nextTopicIdx;
  SArray*        clientTopics;  //SArray<SMqClientTopic>
};

tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
  tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
  pTmq->commit_cb = conf->commit_cb;
  pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
  return pTmq;
}

struct tmq_list_t {
  int32_t cnt;
  int32_t tot;
  char*   elems[];
L
Liu Jicong 已提交
353
};
L
Liu Jicong 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
tmq_list_t* tmq_list_new() {
  tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
  if (ptr == NULL) {
    return ptr;
  }
  ptr->cnt = 0;
  ptr->tot = 8;
  return ptr;
}

int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
  if (ptr->cnt >= ptr->tot-1) return -1;
  ptr->elems[ptr->cnt] = src;
  ptr->cnt++;
  return 0;
}


TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
  SRequestObj *pRequest = NULL;
  tmq->status = 1;
  int32_t sz = topic_list->cnt;
  tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
  for (int i = 0; i < sz; i++) {
    char* topicName = strdup(topic_list->elems[i]);
    taosArrayPush(tmq->clientTopics, &topicName); 
  }
  SCMSubscribeReq req;
  req.topicNum = taosArrayGetSize(tmq->clientTopics);
  req.consumerId = tmq->consumerId;
  req.consumerGroup = strdup(tmq->groupId);
  req.topicNames = tmq->clientTopics;

  int tlen = tSerializeSCMSubscribeReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }

  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (pRequest == NULL) {
    tscError("failed to malloc sqlObj");
  }

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
  pRequest->type = TDMT_MND_CREATE_TOPIC;

  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

_return:
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
L
Liu Jicong 已提交
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
L
Liu Jicong 已提交
441 442 443 444 445
  strcpy(kv.key, "mq-tmp");
  kv.keyLen = strlen("mq-tmp") + 1;
  SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
  if (pMqHb == NULL) {
    return pArray;
L
Liu Jicong 已提交
446
  }
L
Liu Jicong 已提交
447 448 449 450 451
  pMqHb->consumerId = connKey.connId;
  SArray* clientTopics = pTmq->clientTopics;
  int sz = taosArrayGetSize(clientTopics);
  for (int i = 0; i < sz; i++) {
    SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
L
Liu Jicong 已提交
452 453 454 455
    /*if (pCTopic->vgId == -1) {*/
      /*pMqHb->status = 1;*/
      /*break;*/
    /*}*/
L
Liu Jicong 已提交
456 457 458
  }
  kv.value = pMqHb;
  kv.valueLen = sizeof(SMqHbMsg);
L
Liu Jicong 已提交
459
  taosArrayPush(pArray, &kv);
L
Liu Jicong 已提交
460 461

  return pArray;
L
Liu Jicong 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

L
Liu Jicong 已提交
477
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
478 479 480 481
  STscObj     *pTscObj = (STscObj*)taos;
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQueryNode = NULL;
  char        *pStr = NULL;
L
Liu Jicong 已提交
482 483

  terrno = TSDB_CODE_SUCCESS;
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create topic, %s", topicName);
L
Liu Jicong 已提交
503 504

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
505
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
L
Liu Jicong 已提交
506

H
Haojun Liao 已提交
507 508 509
  SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
  pQueryStmtInfo->info.continueQuery = true;

510
  // todo check for invalid sql statement and return with error code
L
Liu Jicong 已提交
511

512 513 514
  SSchema *schema = NULL;
  int32_t  numOfCols = 0;
  CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return);
L
Liu Jicong 已提交
515

516 517 518
  pStr = qDagToString(pRequest->body.pDag);
  if(pStr == NULL) {
    goto _return;
L
Liu Jicong 已提交
519
  }
520

H
Haojun Liao 已提交
521 522
  printf("%s\n", pStr);

523 524 525 526 527 528 529 530 531 532
  // The topic should be related to a database that the queried table is belonged to.
  SName name = {0};
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);

  tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
  tNameFromString(&name, topicName, T_NAME_TABLE);

  char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
  tNameExtractFullName(&name, topicFname);
L
Liu Jicong 已提交
533 534

  SCMCreateTopicReq req = {
535 536 537 538 539
    .name         = (char*) topicFname,
    .igExists     = 0,
    .physicalPlan = (char*) pStr,
    .sql          = (char*) sql,
    .logicalPlan  = "no logic plan",
L
Liu Jicong 已提交
540 541
  };

L
Liu Jicong 已提交
542 543 544 545 546
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
547

L
Liu Jicong 已提交
548 549 550
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
551 552

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
553
  pRequest->type = TDMT_MND_CREATE_TOPIC;
L
Liu Jicong 已提交
554

555
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
556
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
L
Liu Jicong 已提交
557 558

  int64_t transporterId = 0;
559
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
L
Liu Jicong 已提交
560 561 562 563

  tsem_wait(&pRequest->body.rspSem);

_return:
564 565 566 567 568
  qDestroyQuery(pQueryNode);
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

L
Liu Jicong 已提交
569 570 571
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
572

L
Liu Jicong 已提交
573 574 575
  return pRequest;
}

L
Liu Jicong 已提交
576
/*typedef SMqConsumeRsp tmq_message_t;*/
L
Liu Jicong 已提交
577

L
Liu Jicong 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
struct tmq_message_t {
  SMqConsumeRsp rsp;
};

tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
  if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
    return NULL;
  }
  SRequestObj *pRequest = NULL;
  SMqConsumeReq req = {0};
  req.reqType = 1;
  req.blockingTime = blocking_time;
  req.consumerId = tmq->consumerId;
  strcpy(req.cgroup, tmq->groupId);

  SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
  tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
  strcpy(req.topic, pTopic->topicName);
  int32_t nextVgIdx = pTopic->nextVgIdx;
  pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
  req.offset = pVg->currentOffset;

  pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
  pRequest->type = TDMT_VND_CONSUME;

  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

  return (tmq_message_t*)pRequest->body.resInfo.pData;

  /*tsem_wait(&pRequest->body.rspSem);*/

  /*if (body != NULL) {*/
    /*destroySendMsgInfo(body);*/
  /*}*/

  /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
    /*pRequest->code = terrno;*/
  /*}*/

  /*return pRequest;*/
L
Liu Jicong 已提交
624 625
}

L
Liu Jicong 已提交
626 627
tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
  SMqConsumeReq req = {0};
L
Liu Jicong 已提交
628 629 630
  return NULL;
}

L
Liu Jicong 已提交
631 632
void tmq_message_destroy(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;
L
Liu Jicong 已提交
633 634 635
}


X
Xiaoyu Wang 已提交
636 637 638 639 640 641
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
642 643
  }

X
Xiaoyu Wang 已提交
644 645
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
646
  SRequestObj *pRequest = NULL;
647
  SQueryNode  *pQueryNode = NULL;
X
Xiaoyu Wang 已提交
648

X
Xiaoyu Wang 已提交
649
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
650
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
651
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
652

653 654
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
655
  } else {
D
dapan1121 已提交
656 657
    SArray      *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));

658 659
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
660
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
661 662 663
  }

_return:
664
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
665
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
666 667
    pRequest->code = terrno;
  }
668

669 670 671
  return pRequest;
}

672
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
673 674
  pEpSet->version = 0;

H
Haojun Liao 已提交
675
  // init mnode ip set
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

708
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
709
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
710 711 712 713 714
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
715
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
716 717 718
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
719 720 721
    return NULL;
  }

722
  SMsgSendInfo* body = buildConnectMsg(pRequest);
723 724

  int64_t transporterId = 0;
H
Haojun Liao 已提交
725
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
726 727 728

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
729
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
730 731 732 733 734 735
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
736
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
737 738 739 740 741 742
    destroyRequest(pRequest);
  }

  return pTscObj;
}

743 744 745 746 747 748 749
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
750
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
751
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
752 753
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
754
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
755
  pMsgSendInfo->param           = pRequest;
756

S
Shengliang Guan 已提交
757
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
758
  if (pConnect == NULL) {
759
    tfree(pMsgSendInfo);
760
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
761
    return NULL;
762 763
  }

764 765
  STscObj *pObj = pRequest->pTscObj;

766
  char* db = getConnectionDB(pObj);
767 768 769
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
770
  tfree(db);
771

772 773 774
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
775

776 777
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
778 779
}

780
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
781
  assert(pMsgBody != NULL);
782 783
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
784 785
}

786
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
787 788
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
789

790
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
791
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
792
    assert(pRequest->self == pSendInfo->requestObjRefId);
793

794 795
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
796

797 798 799 800 801
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
802 803
    }

804
    /*
805 806
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
807
     */
808
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
809
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
810
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
811
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
812
    } else {
H
Haojun Liao 已提交
813
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
814
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
815
    }
816

H
Haojun Liao 已提交
817
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
818 819
  }

820 821 822 823 824 825 826 827 828 829
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
830 831
  }

832
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
833
  rpcFreeCont(pMsg->pCont);
834
  destroySendMsgInfo(pSendInfo);
835
}
836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
862 863 864 865
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
866
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
867

H
Haojun Liao 已提交
868 869
  SEpSet epSet = {0};

H
Haojun Liao 已提交
870
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
871
    if (pRequest->type == TDMT_VND_QUERY) {
872 873 874 875 876
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

877
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
878
      int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
879 880 881 882
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
883

884 885 886 887
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
      tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows,
               pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

888
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
889 890 891 892
        return NULL;
      }

      goto _return;
893
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
894
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
895
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
896 897
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
898 899 900 901 902 903 904 905 906 907 908
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);

      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

H
Haojun Liao 已提交
926 927 928 929 930 931 932 933
      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
934 935
      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
936
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
937 938 939
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
940 941
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
      return NULL;
H
Haojun Liao 已提交
942
    }
H
Haojun Liao 已提交
943

944
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
945

946 947
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
948
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
949 950 951 952 953 954 955 956 957

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
958 959
_return:

H
Haojun Liao 已提交
960 961 962 963 964
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
965
    }
H
Haojun Liao 已提交
966 967 968 969 970 971
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
972 973 974 975 976 977 978 979
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

980
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
981 982 983 984 985
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
986 987 988
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
989 990 991
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
992
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
993 994
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
995
  }
S
Shengliang Guan 已提交
996 997
}

998 999 1000
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
1001 1002 1003 1004
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
1005

1006
  pthread_mutex_unlock(&pObj->mutex);
1007 1008 1009 1010 1011 1012 1013 1014 1015
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
1016

1017
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
1018 1019 1020
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
1021 1022
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
1023 1024
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
1025

1026
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
1027
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
1028
}