tscAsync.c 15.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
S
slguan 已提交
21
#include "tscLog.h"
H
hzcheng 已提交
22
#include "tscProfile.h"
H
hjxilinx 已提交
23
#include "tscSubquery.h"
H
hzcheng 已提交
24 25
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
S
slguan 已提交
26
#include "tsched.h"
H
hjxilinx 已提交
27
#include "tschemautil.h"
H
hjxilinx 已提交
28
#include "tsclient.h"
H
hzcheng 已提交
29

30 31
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
32 33 34 35

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
36 37 38
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
39
 */
40 41
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
42

H
[td-99]  
hjxilinx 已提交
43
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
H
hzcheng 已提交
44 45
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
46
  
H
hzcheng 已提交
47
  pSql->signature = pSql;
H
hjxilinx 已提交
48 49 50
  pSql->param     = param;
  pSql->pTscObj   = pObj;
  pSql->maxRetry  = TSDB_MAX_REPLICA_NUM;
H
hzcheng 已提交
51
  pSql->fp = fp;
52
  
S
slguan 已提交
53 54
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("failed to malloc payload");
H
[td-99]  
hjxilinx 已提交
55
    tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
H
hzcheng 已提交
56 57
    return;
  }
58
  
59
  pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
H
hzcheng 已提交
60 61
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
H
[td-99]  
hjxilinx 已提交
62
    tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
63
    free(pCmd->payload);
H
hzcheng 已提交
64 65
    return;
  }
66
  
H
hzcheng 已提交
67 68
  pRes->qhandle = 0;
  pRes->numOfRows = 1;
69
  
S
slguan 已提交
70
  strtolower(pSql->sqlstr, sqlstr);
71
  tscDump("%p SQL: %s", pSql, pSql->sqlstr);
72
  
73
  int32_t code = tsParseSql(pSql, true);
H
hzcheng 已提交
74
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
75
  
H
hzcheng 已提交
76
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
77
    pSql->res.code = code;
H
hzcheng 已提交
78 79 80
    tscQueueAsyncRes(pSql);
    return;
  }
81
  
H
hzcheng 已提交
82 83 84
  tscDoQuery(pSql);
}

85
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
86
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
87 88 89
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
H
[td-99]  
hjxilinx 已提交
90 91
    terrno = TSDB_CODE_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
92 93 94 95 96 97
    return;
  }
  
  int32_t sqlLen = strlen(sqlstr);
  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string too long");
H
[td-99]  
hjxilinx 已提交
98 99
    terrno = TSDB_CODE_INVALID_SQL;
    tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL);
100 101 102 103 104 105 106 107
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
H
[td-99]  
hjxilinx 已提交
108 109
    terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
    tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
110 111 112 113 114 115
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

116
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
117 118 119 120 121 122 123 124
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

125 126 127 128
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
129 130 131 132 133 134 135 136 137
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

138 139 140 141 142
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
143
    }
144 145 146 147
    
    return;
  }
  
148
  // local merge has handle this situation during super table non-projection query.
H
hjxilinx 已提交
149
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
H
Haojun Liao 已提交
150
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

167
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
168 169 170 171 172 173
    if (pRes->qhandle == 0) {
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

H
[td-99]  
hjxilinx 已提交
174
    tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
H
hzcheng 已提交
175 176 177 178
    return;
  }

  pSql->fp = fp;
H
hjxilinx 已提交
179
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
180 181 182 183 184 185
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
  tscProcessSql(pSql);
}

/*
186 187 188
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
189
 */
190 191
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
192
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
193 194
}

195 196
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
197
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
198 199 200 201 202 203
}

void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
H
[td-99]  
hjxilinx 已提交
204
    tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
H
hzcheng 已提交
205 206 207 208 209 210 211 212
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
[td-99]  
hjxilinx 已提交
213
    tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
H
hzcheng 已提交
214 215 216 217 218
    return;
  }

  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
219
  pSql->fp = tscAsyncFetchRowsProxy;
H
hzcheng 已提交
220 221

  pSql->param = param;
S
slguan 已提交
222
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
223 224
  
  // handle the sub queries of join query
225
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hjxilinx 已提交
226
    tscFetchDatablockFromSubquery(pSql);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
  } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      return;
    } else {
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }
    
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
    }
    return;
  } else { // current query is not completed, continue retrieve from node
H
hjxilinx 已提交
249
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
250 251 252 253
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
  
    tscProcessSql(pSql);
H
hzcheng 已提交
254 255 256 257 258 259 260
  }
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
H
[td-99]  
hjxilinx 已提交
261
    tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
H
hzcheng 已提交
262 263 264 265 266 267 268 269
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
[td-99]  
hjxilinx 已提交
270
    tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
H
hzcheng 已提交
271 272 273 274 275
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
276
  
H
hzcheng 已提交
277
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
278
    tscResetForNextRetrieve(pRes);
279
    pSql->fp = tscAsyncFetchSingleRowProxy;
280
    
H
hjxilinx 已提交
281
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
282 283 284
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
285 286 287 288 289 290 291 292 293 294 295
    tscProcessSql(pSql);
  } else {
    SSchedMsg schedMsg;
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

296
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
297 298 299 300
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

301
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
302
  
H
hzcheng 已提交
303
  if (numOfRows == 0) {
304 305 306
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
307
      /*
308 309
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
310 311 312
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
313
    return;
H
hzcheng 已提交
314
  }
315
  
316
  for (int i = 0; i < pCmd->numOfCols; ++i){
H
hjxilinx 已提交
317 318 319
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
    if (pSup->pSqlExpr != NULL) {
//      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
320 321 322 323 324
    } else {
      //todo add
    }
  }
  
325 326 327
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
328 329 330 331 332 333
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
334
  
H
hjxilinx 已提交
335
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
336

337
  for (int i = 0; i < pCmd->numOfCols; ++i) {
H
hjxilinx 已提交
338 339 340
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);

    if (pSup->pSqlExpr != NULL) {
341
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
342
    } else {
H
hjxilinx 已提交
343
//      todo add
344
    }
345 346
  }
  
H
hzcheng 已提交
347 348 349 350 351 352 353 354 355 356 357 358 359
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

void tscProcessAsyncRes(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *taosres = pSql;

  // pCmd may be released, so cache pCmd->command
  int cmd = pCmd->command;
H
[TD-98]  
hjxilinx 已提交
360
  int code = pRes->code;
H
hzcheng 已提交
361 362

  // in case of async insert, restore the user specified callback function
H
hjxilinx 已提交
363
  bool shouldFree = tscShouldBeFreed(pSql);
H
hzcheng 已提交
364 365 366 367 368 369 370 371 372

  if (cmd == TSDB_SQL_INSERT) {
    assert(pSql->fp != NULL);
    pSql->fp = pSql->fetchFp;
  }

  (*pSql->fp)(pSql->param, taosres, code);

  if (shouldFree) {
373
    tscTrace("%p sqlObj is automatically freed in async res", pSql);
sangshuduo's avatar
sangshuduo 已提交
374
    tscFreeSqlObj(pSql);
H
hzcheng 已提交
375 376 377
  }
}

H
[td-99]  
hjxilinx 已提交
378
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
379
  void (*fp)() = pMsg->ahandle;
H
[td-99]  
hjxilinx 已提交
380
  (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
H
hzcheng 已提交
381 382
}

H
[td-99]  
hjxilinx 已提交
383 384 385 386
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
H
hzcheng 已提交
387 388 389 390
  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
391
  schedMsg.msg = c;
H
hzcheng 已提交
392 393 394 395 396 397 398 399
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
    tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
    return;
  } else {
H
[td-32]  
hjxilinx 已提交
400
    tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
H
hzcheng 已提交
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  }

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncRes;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscProcessAsyncFree(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  tscTrace("%p sql is freed", pSql);
  taos_free_result(pSql);
}

void tscQueueAsyncFreeResult(SSqlObj *pSql) {
  tscTrace("%p sqlObj put in queue to async free", pSql);

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncFree;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

int tscSendMsgToServer(SSqlObj *pSql);

H
hjxilinx 已提交
430
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
H
hzcheng 已提交
431 432 433 434 435 436
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
[td-32]  
hjxilinx 已提交
437 438
  if (code != TSDB_CODE_SUCCESS) {
    pRes->code = code;
H
hzcheng 已提交
439 440 441 442 443
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
444
    // check if it is a sub-query of super table query first, if true, enter another routine
H
hjxilinx 已提交
445
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
446 447
  
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
H
hjxilinx 已提交
448
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
L
lihui 已提交
449 450 451 452
      if (pTableMetaInfo->pTableMeta == NULL){
        code = tscGetTableMeta(pSql, pTableMetaInfo);
        assert(code == TSDB_CODE_SUCCESS);      
      }     
L
lihui 已提交
453
      
H
hjxilinx 已提交
454
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vgroupIndex >= 0 && pSql->param != NULL);
H
hzcheng 已提交
455 456 457

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
      SSqlObj *         pParObj = trs->pParentSqlObj;
458
      
H
hjxilinx 已提交
459
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
H
hjxilinx 已提交
460
          tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
S
slguan 已提交
461

L
lihui 已提交
462
      tscTrace("%p get metricMeta during super table query successfully", pSql);      
H
hzcheng 已提交
463

H
hjxilinx 已提交
464
      code = tscGetSTableVgroupInfo(pSql, 0);
H
hzcheng 已提交
465 466 467 468
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
    } else {  // normal async query continues
469
      if (pCmd->parseFinished) {
470
        tscTrace("%p re-send data to vnode in table Meta callback since sql parsed completed", pSql);
L
[#1083]  
lihui 已提交
471
        
472
        STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
473
        code = tscGetTableMeta(pSql, pTableMetaInfo);
L
[#1083]  
lihui 已提交
474 475
        assert(code == TSDB_CODE_SUCCESS);
      
H
hjxilinx 已提交
476
        if (pTableMetaInfo->pTableMeta) {
477 478
          // todo update the submit message according to the new table meta
          // 1. table uid, 2. ip address
L
[#1083]  
lihui 已提交
479 480 481 482
          code = tscSendMsgToServer(pSql);
          if (code == TSDB_CODE_SUCCESS) return;
        }
      } else {
H
hjxilinx 已提交
483
        code = tsParseSql(pSql, false);
L
[#1083]  
lihui 已提交
484 485
        if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
      }
H
hzcheng 已提交
486
    }
S
slguan 已提交
487

H
hzcheng 已提交
488
  } else {  // stream computing
489
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
490
    code = tscGetTableMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
491 492 493 494
    pRes->code = code;

    if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;

weixin_48148422's avatar
weixin_48148422 已提交
495
    if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
496
      code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
497 498 499 500 501 502
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
    }
  }

503 504
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
H
hzcheng 已提交
505 506 507 508 509 510 511 512
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream) {
    tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
    /*
     * NOTE:
H
hjxilinx 已提交
513
     * transfer the sql function for super table query before get meter/metric meta,
514
     * since in callback functions, only tscProcessSql(pStream->pSql) is executed!
H
hzcheng 已提交
515
     */
H
hjxilinx 已提交
516
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
517
    
H
hjxilinx 已提交
518
    tscTansformSQLFuncForSTableQuery(pQueryInfo);
H
hzcheng 已提交
519 520
    tscIncStreamExecutionCount(pSql->pStream);
  } else {
H
hjxilinx 已提交
521
    tscTrace("%p get tableMeta successfully", pSql);
H
hzcheng 已提交
522 523 524 525
  }

  tscDoQuery(pSql);
}