tscAsync.c 16.9 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hzcheng 已提交
17 18 19 20 21 22 23 24 25

#include "tlog.h"
#include "trpc.h"
#include "tscProfile.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "tutil.h"
L
lihui 已提交
26
#include "tnote.h"
S
slguan 已提交
27
#include "tsched.h"
H
hjxilinx 已提交
28
#include "tschemautil.h"
H
hzcheng 已提交
29

30 31
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
32 33 34 35

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
36 37 38
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
39
 */
40 41
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
42

43
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) {
H
hzcheng 已提交
44 45
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
46
  
H
hzcheng 已提交
47 48 49 50
  pSql->signature = pSql;
  pSql->pTscObj = pObj;
  pSql->fp = fp;
  pSql->param = param;
51
  
S
slguan 已提交
52 53 54
  if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
    tscError("failed to malloc payload");
    tfree(pSql);
H
hzcheng 已提交
55 56 57
    tscQueueAsyncError(fp, param);
    return;
  }
58
  
H
hzcheng 已提交
59 60 61 62
  pSql->sqlstr = malloc(sqlLen + 1);
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
    tscQueueAsyncError(fp, param);
63 64
    free(pCmd->payload);
    free(pSql);
H
hzcheng 已提交
65 66
    return;
  }
67
  
H
hzcheng 已提交
68 69
  pRes->qhandle = 0;
  pRes->numOfRows = 1;
70
  
S
slguan 已提交
71
  strtolower(pSql->sqlstr, sqlstr);
L
lihui 已提交
72
  tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr);
73
  
74
  int32_t code = tsParseSql(pSql, true);
H
hzcheng 已提交
75
  if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
76
  
H
hzcheng 已提交
77 78 79 80 81
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = (uint8_t)code;
    tscQueueAsyncRes(pSql);
    return;
  }
82
  
H
hzcheng 已提交
83 84 85
  tscDoQuery(pSql);
}

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
    globalCode = TSDB_CODE_DISCONNECTED;
    tscQueueAsyncError(fp, param);
    return;
  }
  
  int32_t sqlLen = strlen(sqlstr);
  if (sqlLen > tsMaxSQLStringLen) {
    tscError("sql string too long");
    tscQueueAsyncError(fp, param);
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
    tscQueueAsyncError(fp, param);
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

115
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
116 117 118 119 120 121 122 123
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

124 125 126 127
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
128 129 130 131 132 133 134 135 136
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

137 138 139 140 141
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
142
    }
143 144 145 146 147 148
    
    return;
  }
  
  // local reducer has handle this situation during super table non-projection query.
  if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
149
    pRes->numOfTotalInCurrentClause += pRes->numOfRows;
H
hzcheng 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

166
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
    if (pRes->qhandle == 0) {
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

    tscQueueAsyncError(pSql->fetchFp, param);
    return;
  }

  pSql->fp = fp;
  if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
  tscProcessSql(pSql);
}

/*
185 186 187
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
188
 */
189 190
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
191
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
192 193
}

194 195
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
196
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
}

void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
    globalCode = TSDB_CODE_DISCONNECTED;
    tscQueueAsyncError(fp, param);
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
    tscQueueAsyncError(fp, param);
    return;
  }

  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
219
  pSql->fp = tscAsyncFetchRowsProxy;
H
hzcheng 已提交
220 221

  pSql->param = param;
S
slguan 已提交
222
  tscResetForNextRetrieve(pRes);
H
hzcheng 已提交
223 224 225 226

  if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
S
slguan 已提交
227

H
hzcheng 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  tscProcessSql(pSql);
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
    globalCode = TSDB_CODE_DISCONNECTED;
    tscQueueAsyncError(fp, param);
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
    tscQueueAsyncError(fp, param);
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
251
  
H
hzcheng 已提交
252
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
253
    tscResetForNextRetrieve(pRes);
254
    pSql->fp = tscAsyncFetchSingleRowProxy;
255 256 257 258 259
    
    if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
260 261 262 263 264 265 266 267 268 269 270
    tscProcessSql(pSql);
  } else {
    SSchedMsg schedMsg;
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

271
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
272 273 274 275
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

276
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
277
  
H
hzcheng 已提交
278
  if (numOfRows == 0) {
279 280 281
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
282
      /*
283 284
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
285 286 287
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
288
    return;
H
hzcheng 已提交
289
  }
290
  
291 292 293 294 295 296 297 298 299
  for (int i = 0; i < pCmd->numOfCols; ++i){
    SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
    if (pExpr != NULL) {
      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
    } else {
      //todo add
    }
  }
  
300 301 302
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
303 304 305 306 307 308
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
309
  
H
hjxilinx 已提交
310
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
311

312
  for (int i = 0; i < pCmd->numOfCols; ++i) {
313 314 315 316 317 318
    SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
    if (pExpr != NULL) {
      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
    } else {
      //todo add
    }
319 320
  }
  
H
hzcheng 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

void tscProcessAsyncRes(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  void *taosres = pSql;

  // pCmd may be released, so cache pCmd->command
  int cmd = pCmd->command;
  int code = pRes->code ? -pRes->code : pRes->numOfRows;

  // in case of async insert, restore the user specified callback function
  bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);

  if (cmd == TSDB_SQL_INSERT) {
    assert(pSql->fp != NULL);
    pSql->fp = pSql->fetchFp;
  }

  (*pSql->fp)(pSql->param, taosres, code);

  if (shouldFree) {
    tscFreeSqlObj(pSql);
    tscTrace("%p Async sql is automatically freed in async res", pSql);
  }
}

void tscProcessAsyncError(SSchedMsg *pMsg) {
  void (*fp)() = pMsg->ahandle;

  (*fp)(pMsg->thandle, NULL, -1);
}

void tscQueueAsyncError(void(*fp), void *param) {
  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
    tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
    return;
  } else {
372
    tscError("%p add into queued async res, code:%d", pSql, pSql->res.code);
H
hzcheng 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
  }

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncRes;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscProcessAsyncFree(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  tscTrace("%p sql is freed", pSql);
  taos_free_result(pSql);
}

void tscQueueAsyncFreeResult(SSqlObj *pSql) {
  tscTrace("%p sqlObj put in queue to async free", pSql);

  SSchedMsg schedMsg;
  schedMsg.fp = tscProcessAsyncFree;
  schedMsg.ahandle = pSql;
  schedMsg.thandle = (void *)1;
  schedMsg.msg = NULL;
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) {
  SSqlObj *pSql = (SSqlObj *)param;
  SSqlCmd *pCmd = &pSql->cmd;
  int32_t  code = TSDB_CODE_SUCCESS;

405
  assert(pCmd->dataSourceType != 0 && pSql->signature == pSql);
H
hjxilinx 已提交
406
  
407 408 409
  int32_t index = 0;
  SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
  
H
hjxilinx 已提交
410
  STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
411
  assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
H
hjxilinx 已提交
412
  
H
hzcheng 已提交
413
  SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
H
hjxilinx 已提交
414
  if (pDataBlocks == NULL || pTableMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
H
hzcheng 已提交
415 416 417 418 419
    // restore user defined fp
    pSql->fp = pSql->fetchFp;
    tscTrace("%p Async insertion completed, destroy data block list", pSql);

    // release data block data
S
slguan 已提交
420
    pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
H
hzcheng 已提交
421 422 423 424 425

    // all data has been sent to vnode, call user function
    (*pSql->fp)(pSql->param, tres, numOfRows);
  } else {
    do {
H
hjxilinx 已提交
426
      code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pTableMetaInfo->vnodeIndex++]);
H
hzcheng 已提交
427 428
      if (code != TSDB_CODE_SUCCESS) {
        tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d",
H
hjxilinx 已提交
429
                 pSql, pTableMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code);
H
hzcheng 已提交
430 431
      }

H
hjxilinx 已提交
432
    } while (code != TSDB_CODE_SUCCESS && pTableMetaInfo->vnodeIndex < pDataBlocks->nSize);
H
hzcheng 已提交
433 434 435

    // build submit msg may fail
    if (code == TSDB_CODE_SUCCESS) {
H
hjxilinx 已提交
436
      tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pTableMetaInfo->vnodeIndex - 1, pDataBlocks->nSize);
H
hzcheng 已提交
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
      tscProcessSql(pSql);
    }
  }
}

int tscSendMsgToServer(SSqlObj *pSql);

void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

  if (pSql->fp == NULL) {
    tscError("%p callBack is NULL!!!", pSql);
    return;
  }

  if (pSql->fp == (void *)1) {
    pSql->fp = NULL;

    if (code != 0) {
      code = abs(code);
      pRes->code = code;
S
slguan 已提交
462
      tscTrace("%p failed to renew tableMeta", pSql);
S
slguan 已提交
463
      tsem_post(&pSql->rspSem);
H
hzcheng 已提交
464
    } else {
S
slguan 已提交
465
      tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
S
slguan 已提交
466
          pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
467
  
H
hjxilinx 已提交
468 469
      STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
      assert(pTableMetaInfo->pTableMeta == NULL);
470
      
H
hjxilinx 已提交
471
      tscGetMeterMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
472 473 474
      code = tscSendMsgToServer(pSql);
      if (code != 0) {
        pRes->code = code;
S
slguan 已提交
475
        tsem_post(&pSql->rspSem);
H
hzcheng 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488
      }
    }

    return;
  }

  if (code != 0) {
    pRes->code = (uint8_t)abs(code);
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
489
    // check if it is a sub-query of super table query first, if true, enter another routine
H
hjxilinx 已提交
490
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
491 492
  
    if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
H
hjxilinx 已提交
493
      STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
H
hjxilinx 已提交
494
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
H
hzcheng 已提交
495 496 497

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
      SSqlObj *         pParObj = trs->pParentSqlObj;
498
      
H
hjxilinx 已提交
499 500
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vnodeIndex &&
          tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
S
slguan 已提交
501

H
hjxilinx 已提交
502
      tscTrace("%p get metricMeta during super table query successfully", pSql);
503
      
H
hjxilinx 已提交
504
      code = tscGetMeterMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
505 506 507 508
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;

509
      code = tscGetMetricMeta(pSql, 0);
H
hzcheng 已提交
510 511 512 513
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
    } else {  // normal async query continues
L
[#1083]  
lihui 已提交
514 515 516
      if (pCmd->isParseFinish) {
        tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
        
H
hjxilinx 已提交
517 518
        STableMetaInfo* pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
        code = tscGetMeterMeta(pSql, pTableMetaInfo);
L
[#1083]  
lihui 已提交
519 520
        assert(code == TSDB_CODE_SUCCESS);
      
H
hjxilinx 已提交
521
        if (pTableMetaInfo->pTableMeta) {
L
[#1083]  
lihui 已提交
522 523 524 525
          code = tscSendMsgToServer(pSql);
          if (code == TSDB_CODE_SUCCESS) return;
        }
      } else {
H
hjxilinx 已提交
526
        code = tsParseSql(pSql, false);
L
[#1083]  
lihui 已提交
527 528
        if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
      }
H
hzcheng 已提交
529
    }
S
slguan 已提交
530

H
hzcheng 已提交
531
  } else {  // stream computing
H
hjxilinx 已提交
532 533
    STableMetaInfo *pTableMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
    code = tscGetMeterMeta(pSql, pTableMetaInfo);
H
hzcheng 已提交
534 535 536 537
    pRes->code = code;

    if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;

H
hjxilinx 已提交
538
    if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
539
      code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
H
hzcheng 已提交
540 541 542 543 544 545
      pRes->code = code;

      if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
    }
  }

546 547
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
H
hzcheng 已提交
548 549 550 551 552 553 554 555
    tscQueueAsyncRes(pSql);
    return;
  }

  if (pSql->pStream) {
    tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
    /*
     * NOTE:
H
hjxilinx 已提交
556
     * transfer the sql function for super table query before get meter/metric meta,
557
     * since in callback functions, only tscProcessSql(pStream->pSql) is executed!
H
hzcheng 已提交
558
     */
H
hjxilinx 已提交
559
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
560
    
561
    tscTansformSQLFunctionForSTableQuery(pQueryInfo);
H
hzcheng 已提交
562 563
    tscIncStreamExecutionCount(pSql->pStream);
  } else {
S
slguan 已提交
564
    tscTrace("%p get tableMeta/metricMeta successfully", pSql);
H
hzcheng 已提交
565 566 567 568
  }

  tscDoQuery(pSql);
}