clientImpl.c 30.7 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15

16
#define CHECK_CODE_GOTO(expr, label) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20
      terrno = code;                 \
21
      goto label;                    \
H
Haojun Liao 已提交
22
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

30
static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

H
Haojun Liao 已提交
61
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

74
  char localDb[TSDB_DB_NAME_LEN] = {0};
75 76 77 78 79 80
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

81 82
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
83 84
  }

85
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112
  char* key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
113
  SAppInstInfo** pInst = NULL;
114

H
Haojun Liao 已提交
115 116 117
  pthread_mutex_lock(&appInfo.mutex);

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
118
  if (pInst == NULL) {
H
Haojun Liao 已提交
119 120
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
121
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
122
    p->pAppHbMgr = appHbMgrInit(p);
H
Haojun Liao 已提交
123
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
124

H
Haojun Liao 已提交
125
    pInst = &p;
126 127
  }

H
Haojun Liao 已提交
128 129
  pthread_mutex_unlock(&appInfo.mutex);

H
Haojun Liao 已提交
130
  tfree(key);
H
Haojun Liao 已提交
131
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
132 133
}

X
Xiaoyu Wang 已提交
134 135 136
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
137
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
138
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
139 140
  }

X
Xiaoyu Wang 已提交
141 142 143 144 145
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
146 147
  }

X
Xiaoyu Wang 已提交
148 149 150
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
151

H
Haojun Liao 已提交
152
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
153 154
  return TSDB_CODE_SUCCESS;
}
155

X
Xiaoyu Wang 已提交
156
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
157 158
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
159
  SParseContext cxt = {
H
Haojun Liao 已提交
160
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
161
    .acctId    = pTscObj->acctId,
H
Haojun Liao 已提交
162
    .db        = getDbOfConnection(pTscObj),
H
Haojun Liao 已提交
163 164 165 166
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
167
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
168
  };
H
Haojun Liao 已提交
169

H
Haojun Liao 已提交
170 171
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
172
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
173
    tfree(cxt.db);
H
Haojun Liao 已提交
174 175 176 177
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
178

H
Haojun Liao 已提交
179
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
180 181
  return code;
}
H
Haojun Liao 已提交
182

X
Xiaoyu Wang 已提交
183 184 185
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
186
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
187

H
Haojun Liao 已提交
188
  STscObj* pTscObj = pRequest->pTscObj;
189
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
190

191
  int64_t transporterId = 0;
192 193 194 195
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
196
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
197 198 199
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
200
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
201
  } else {
H
Haojun Liao 已提交
202
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
203 204
  }

X
Xiaoyu Wang 已提交
205 206 207
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
208

209
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) {
210
  pRequest->type = pQueryNode->type;
211

212 213 214
  SSchema* pSchema = NULL;
  int32_t  numOfCols = 0;
  int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
215 216 217 218 219
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
220
    setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
221
    pRequest->type = TDMT_VND_QUERY;
D
dapan1121 已提交
222 223
  } else {
    tfree(pSchema);
224 225 226
  }

  return code;
X
Xiaoyu Wang 已提交
227 228
}

229 230
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
231

232 233
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
234 235

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
236 237 238
    pResInfo->fields[i].bytes = pSchema[i].bytes;
    pResInfo->fields[i].type  = pSchema[i].type;
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
239
  }
X
Xiaoyu Wang 已提交
240 241
}

242
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
243
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
244
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
D
dapan1121 已提交
245
    int32_t code = schedulerExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
246 247 248
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
249
      if (pRequest->body.pQueryJob != NULL) {
D
dapan1121 已提交
250
        schedulerFreeJob(pRequest->body.pQueryJob);
251 252 253
      }
    }

H
Haojun Liao 已提交
254 255 256
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
257
  }
258

259
  return schedulerAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, pNodeList, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
260
}
X
Xiaoyu Wang 已提交
261

L
Liu Jicong 已提交
262

L
Liu Jicong 已提交
263
typedef struct SMqClientVg {
L
Liu Jicong 已提交
264 265 266 267 268 269 270 271
  // statistics
  int64_t consumeCnt;
  // offset
  int64_t committedOffset;
  int64_t currentOffset;
  //connection info
  int32_t vgId;
  SEpSet  epSet;
L
Liu Jicong 已提交
272 273 274 275 276 277 278 279 280 281
} SMqClientVg;

typedef struct SMqClientTopic {
  // subscribe info
  int32_t sqlLen;
  char*   sql;
  char*   topicName;
  int64_t topicId;
  int32_t nextVgIdx;
  SArray* vgs;    //SArray<SMqClientVg>
L
Liu Jicong 已提交
282 283 284 285 286 287
} SMqClientTopic;

typedef struct tmq_resp_err_t {
  int32_t code;
} tmq_resp_err_t;

L
Liu Jicong 已提交
288 289
typedef struct tmq_topic_vgroup_t {
  char*   topic;
L
Liu Jicong 已提交
290
  int32_t vgId;
L
Liu Jicong 已提交
291 292 293 294 295 296 297
  int64_t commitOffset;
} tmq_topic_vgroup_t;

typedef struct tmq_topic_vgroup_list_t {
  int32_t cnt;
  int32_t size;
  tmq_topic_vgroup_t* elems;
L
Liu Jicong 已提交
298 299 300 301
} tmq_topic_vgroup_list_t;

typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));

L
Liu Jicong 已提交
302 303 304
struct tmq_conf_t {
  char           clientId[256];
  char           groupId[256];
L
Liu Jicong 已提交
305 306 307
  char*          ip;
  uint16_t       port;
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
};

tmq_conf_t* tmq_conf_new() {
  tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
  return conf;
}

int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
  if (strcmp(key, "group.id")) {
    strcpy(conf->groupId, value);
  }
  if (strcmp(key, "client.id")) {
    strcpy(conf->clientId, value);
  }
  return 0;
}
L
Liu Jicong 已提交
324 325 326 327

struct tmq_t {
  char           groupId[256];
  char           clientId[256];
L
Liu Jicong 已提交
328 329
  int64_t        consumerId;
  int64_t        status;
L
Liu Jicong 已提交
330 331
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
332 333
  int32_t        nextTopicIdx;
  SArray*        clientTopics;  //SArray<SMqClientTopic>
L
Liu Jicong 已提交
334 335
};

L
Liu Jicong 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
  tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
  pTmq->commit_cb = conf->commit_cb;
  pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
  return pTmq;
}

struct tmq_list_t {
  int32_t cnt;
  int32_t tot;
  char*   elems[];
L
Liu Jicong 已提交
354
};
L
Liu Jicong 已提交
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
tmq_list_t* tmq_list_new() {
  tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
  if (ptr == NULL) {
    return ptr;
  }
  ptr->cnt = 0;
  ptr->tot = 8;
  return ptr;
}

int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
  if (ptr->cnt >= ptr->tot-1) return -1;
  ptr->elems[ptr->cnt] = src;
  ptr->cnt++;
  return 0;
}


TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
  SRequestObj *pRequest = NULL;
  tmq->status = 1;
  int32_t sz = topic_list->cnt;
  tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
  for (int i = 0; i < sz; i++) {
    char* topicName = strdup(topic_list->elems[i]);
    taosArrayPush(tmq->clientTopics, &topicName); 
  }
  SCMSubscribeReq req;
  req.topicNum = taosArrayGetSize(tmq->clientTopics);
  req.consumerId = tmq->consumerId;
  req.consumerGroup = strdup(tmq->groupId);
  req.topicNames = tmq->clientTopics;

  int tlen = tSerializeSCMSubscribeReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }

  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (pRequest == NULL) {
    tscError("failed to malloc sqlObj");
  }

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
  pRequest->type = TDMT_MND_CREATE_TOPIC;

  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

_return:
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
L
Liu Jicong 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441

void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
  conf->commit_cb = cb;
}

SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
  tmq_t* pTmq = (void*)param;
  SArray* pArray = taosArrayInit(0, sizeof(SKv));
  if (pArray == NULL) {
    return NULL;
  }
  SKv kv = {0};
  kv.key = malloc(256);
  if (kv.key == NULL) {
    taosArrayDestroy(pArray);
    return NULL;
  }
L
Liu Jicong 已提交
442 443 444 445 446
  strcpy(kv.key, "mq-tmp");
  kv.keyLen = strlen("mq-tmp") + 1;
  SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
  if (pMqHb == NULL) {
    return pArray;
L
Liu Jicong 已提交
447
  }
L
Liu Jicong 已提交
448 449 450 451 452
  pMqHb->consumerId = connKey.connId;
  SArray* clientTopics = pTmq->clientTopics;
  int sz = taosArrayGetSize(clientTopics);
  for (int i = 0; i < sz; i++) {
    SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
L
Liu Jicong 已提交
453 454 455 456
    /*if (pCTopic->vgId == -1) {*/
      /*pMqHb->status = 1;*/
      /*break;*/
    /*}*/
L
Liu Jicong 已提交
457 458 459
  }
  kv.value = pMqHb;
  kv.valueLen = sizeof(SMqHbMsg);
L
Liu Jicong 已提交
460
  taosArrayPush(pArray, &kv);
L
Liu Jicong 已提交
461 462

  return pArray;
L
Liu Jicong 已提交
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
}

tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
  tmq_t* pTmq = malloc(sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  strcpy(pTmq->groupId, conf->groupId);
  strcpy(pTmq->clientId, conf->clientId);
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;

  return pTmq;
}

L
Liu Jicong 已提交
478
TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
479 480 481 482
  STscObj     *pTscObj = (STscObj*)taos;
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQueryNode = NULL;
  char        *pStr = NULL;
L
Liu Jicong 已提交
483 484

  terrno = TSDB_CODE_SUCCESS;
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create topic, %s", topicName);
L
Liu Jicong 已提交
504 505

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
506
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
L
Liu Jicong 已提交
507

H
Haojun Liao 已提交
508 509 510
  SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
  pQueryStmtInfo->info.continueQuery = true;

511
  // todo check for invalid sql statement and return with error code
L
Liu Jicong 已提交
512

513 514 515
  SSchema *schema = NULL;
  int32_t  numOfCols = 0;
  CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return);
L
Liu Jicong 已提交
516

517 518 519
  pStr = qDagToString(pRequest->body.pDag);
  if(pStr == NULL) {
    goto _return;
L
Liu Jicong 已提交
520
  }
521

H
Haojun Liao 已提交
522 523
  printf("%s\n", pStr);

524 525 526 527 528 529 530 531 532 533
  // The topic should be related to a database that the queried table is belonged to.
  SName name = {0};
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName);

  tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB);
  tNameFromString(&name, topicName, T_NAME_TABLE);

  char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
  tNameExtractFullName(&name, topicFname);
L
Liu Jicong 已提交
534 535

  SCMCreateTopicReq req = {
536 537 538 539 540
    .name         = (char*) topicFname,
    .igExists     = 0,
    .physicalPlan = (char*) pStr,
    .sql          = (char*) sql,
    .logicalPlan  = "no logic plan",
L
Liu Jicong 已提交
541 542
  };

L
Liu Jicong 已提交
543 544 545 546 547
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
548

L
Liu Jicong 已提交
549 550 551
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
552 553

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
554
  pRequest->type = TDMT_MND_CREATE_TOPIC;
L
Liu Jicong 已提交
555

556
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
557
  SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
L
Liu Jicong 已提交
558 559

  int64_t transporterId = 0;
560
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
L
Liu Jicong 已提交
561 562 563 564

  tsem_wait(&pRequest->body.rspSem);

_return:
565 566 567 568 569
  qDestroyQuery(pQueryNode);
  if (body != NULL) {
    destroySendMsgInfo(body);
  }

L
Liu Jicong 已提交
570 571 572
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
573

L
Liu Jicong 已提交
574 575 576
  return pRequest;
}

L
Liu Jicong 已提交
577
/*typedef SMqConsumeRsp tmq_message_t;*/
L
Liu Jicong 已提交
578

L
Liu Jicong 已提交
579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
struct tmq_message_t {
  SMqConsumeRsp rsp;
};

tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
  if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
    return NULL;
  }
  SRequestObj *pRequest = NULL;
  SMqConsumeReq req = {0};
  req.reqType = 1;
  req.blockingTime = blocking_time;
  req.consumerId = tmq->consumerId;
  strcpy(req.cgroup, tmq->groupId);

  SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
  tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
  strcpy(req.topic, pTopic->topicName);
  int32_t nextVgIdx = pTopic->nextVgIdx;
  pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
  req.offset = pVg->currentOffset;

  pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
  pRequest->type = TDMT_VND_CONSUME;

  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

  return (tmq_message_t*)pRequest->body.resInfo.pData;

  /*tsem_wait(&pRequest->body.rspSem);*/

  /*if (body != NULL) {*/
    /*destroySendMsgInfo(body);*/
  /*}*/

  /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
    /*pRequest->code = terrno;*/
  /*}*/

  /*return pRequest;*/
L
Liu Jicong 已提交
625 626
}

L
Liu Jicong 已提交
627 628
tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
  SMqConsumeReq req = {0};
L
Liu Jicong 已提交
629 630 631
  return NULL;
}

L
Liu Jicong 已提交
632 633
void tmq_message_destroy(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;
L
Liu Jicong 已提交
634 635 636
}


X
Xiaoyu Wang 已提交
637 638 639 640 641 642
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
643 644
  }

X
Xiaoyu Wang 已提交
645 646
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
647
  SRequestObj *pRequest = NULL;
648
  SQueryNode  *pQueryNode = NULL;
H
Haojun Liao 已提交
649
  SArray      *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
650

X
Xiaoyu Wang 已提交
651
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
652
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
653
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
654

655 656
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
657
  } else {
D
dapan1121 已提交
658

659 660
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
661
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
662 663 664
  }

_return:
665
  taosArrayDestroy(pNodeList);
666
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
667
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
668 669
    pRequest->code = terrno;
  }
670

671 672 673
  return pRequest;
}

674
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
675 676
  pEpSet->version = 0;

H
Haojun Liao 已提交
677
  // init mnode ip set
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

H
Haojun Liao 已提交
710
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
711
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
712 713 714 715 716
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
717
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
718 719 720
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
721 722 723
    return NULL;
  }

724
  SMsgSendInfo* body = buildConnectMsg(pRequest);
725 726

  int64_t transporterId = 0;
H
Haojun Liao 已提交
727
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
728 729 730

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
731
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
732 733 734 735 736 737
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
738
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
739 740 741 742 743 744
    destroyRequest(pRequest);
  }

  return pTscObj;
}

745 746 747 748 749 750 751
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
752
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
753
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
754 755
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
756
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
757
  pMsgSendInfo->param           = pRequest;
758

S
Shengliang Guan 已提交
759
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
760
  if (pConnect == NULL) {
761
    tfree(pMsgSendInfo);
762
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
763
    return NULL;
764 765
  }

766 767
  STscObj *pObj = pRequest->pTscObj;

H
Haojun Liao 已提交
768
  char* db = getDbOfConnection(pObj);
769 770 771
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
772
  tfree(db);
773

774 775 776
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
777

778 779
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
780 781
}

782
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
783
  assert(pMsgBody != NULL);
784 785
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
786 787
}

788
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
789 790
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
791

792
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
793
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
794
    assert(pRequest->self == pSendInfo->requestObjRefId);
795

796 797
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
798

799 800 801 802 803
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
804 805
    }

806
    /*
807 808
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
809
     */
810
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
811
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
812
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
813
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
814
    } else {
H
Haojun Liao 已提交
815
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
816
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
817
    }
818

H
Haojun Liao 已提交
819
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
820 821
  }

822 823 824 825 826 827 828 829 830 831
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
832 833
  }

834
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
835
  rpcFreeCont(pMsg->pCont);
836
  destroySendMsgInfo(pSendInfo);
837
}
838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

dengyihao's avatar
dengyihao 已提交
859 860 861 862
  strncpy(ipStr,   ip,   TMIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
863
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
864 865 866 867
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
868
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
869

H
Haojun Liao 已提交
870 871
  SEpSet epSet = {0};

H
Haojun Liao 已提交
872
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
873
    if (pRequest->type == TDMT_VND_QUERY) {
874 875 876 877 878
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

879
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
880
      int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
881 882 883 884
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
885

886 887 888 889
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
      tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows,
               pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

890
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
891 892 893 894
        return NULL;
      }

      goto _return;
895
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
896
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
897
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
898 899
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
900 901 902 903 904 905 906 907 908 909 910
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);

      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

H
Haojun Liao 已提交
928 929 930 931 932 933 934 935
      epSet.numOfEps = pVgroupInfo->numOfEps;
      epSet.inUse = pVgroupInfo->inUse;

      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
        strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
        epSet.port[i] = pVgroupInfo->epAddr[i].port;
      }

H
Haojun Liao 已提交
936 937
      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
938
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
939 940 941
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
942 943
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
      return NULL;
H
Haojun Liao 已提交
944
    }
H
Haojun Liao 已提交
945

946
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
947

948 949
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
950
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
951 952 953 954 955 956 957 958 959

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
960 961
_return:

H
Haojun Liao 已提交
962 963 964 965 966
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
967
    }
H
Haojun Liao 已提交
968 969 970 971 972 973
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
974 975 976 977 978 979 980 981
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

982
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
983 984 985 986 987
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
988 989 990
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
991 992 993
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
994
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
995 996
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
997
  }
S
Shengliang Guan 已提交
998 999
}

H
Haojun Liao 已提交
1000
char* getDbOfConnection(STscObj* pObj) {
1001 1002
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
1003 1004 1005 1006
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
1007

1008
  pthread_mutex_unlock(&pObj->mutex);
1009 1010 1011 1012 1013 1014 1015 1016 1017
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
1018

1019
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
1020 1021 1022
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
1023 1024
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
1025 1026
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
1027

1028
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
1029
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
1030
}