tscAsync.c 17.2 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
H
Haojun Liao 已提交
21
#include "tcache.h"
S
slguan 已提交
22
#include "tscLog.h"
H
hjxilinx 已提交
23
#include "tscSubquery.h"
24
#include "tscLocalMerge.h"
H
hzcheng 已提交
25
#include "tscUtil.h"
S
slguan 已提交
26
#include "tsched.h"
H
hjxilinx 已提交
27
#include "tschemautil.h"
H
hjxilinx 已提交
28
#include "tsclient.h"
H
hzcheng 已提交
29

30 31
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
32 33 34 35

static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());

/*
36 37 38
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
39
 */
40 41
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
42

H
Haojun Liao 已提交
43
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
H
Haojun Liao 已提交
44 45
  SSqlCmd* pCmd = &pSql->cmd;

H
hzcheng 已提交
46
  pSql->signature = pSql;
H
hjxilinx 已提交
47 48
  pSql->param     = param;
  pSql->pTscObj   = pObj;
49
  pSql->parseRetry= 0;
50
  pSql->maxRetry  = TSDB_MAX_REPLICA;
51
  pSql->fp        = fp;
H
Haojun Liao 已提交
52
  pSql->fetchFp   = fp;
H
Haojun Liao 已提交
53

54
  registerSqlObj(pSql);
H
Haojun Liao 已提交
55

H
Haojun Liao 已提交
56
  pSql->sqlstr = calloc(1, sqlLen + 1);
H
hzcheng 已提交
57 58
  if (pSql->sqlstr == NULL) {
    tscError("%p failed to malloc sql string buffer", pSql);
H
Haojun Liao 已提交
59 60
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
61 62
    return;
  }
H
Haojun Liao 已提交
63

S
Shengliang Guan 已提交
64
  strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
H
Haojun Liao 已提交
65

S
Shengliang Guan 已提交
66
  tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
H
Haojun Liao 已提交
67 68
  pCmd->curSql = pSql->sqlstr;

H
Haojun Liao 已提交
69
  int32_t code = tsParseSql(pSql, true);
70
  if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
71
  
H
hzcheng 已提交
72
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
73
    pSql->res.code = code;
H
hzcheng 已提交
74 75 76
    tscQueueAsyncRes(pSql);
    return;
  }
H
Haojun Liao 已提交
77

H
hzcheng 已提交
78 79 80
  tscDoQuery(pSql);
}

81
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
82
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
83 84 85
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
    tscError("bug!!! pObj:%p", pObj);
86 87
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
88 89 90
    return;
  }
  
S
Shengliang Guan 已提交
91
  int32_t sqlLen = (int32_t)strlen(sqlstr);
92
  if (sqlLen > tsMaxSQLStringLen) {
H
Haojun Liao 已提交
93
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
94 95
    terrno = TSDB_CODE_TSC_INVALID_SQL;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_INVALID_SQL);
96 97 98 99 100 101 102 103
    return;
  }
  
  taosNotePrintTsc(sqlstr);
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
104
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
105 106 107 108 109 110
    return;
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}

111
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
112 113 114 115 116 117 118 119
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

120 121 122 123
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
124 125 126 127 128 129 130 131 132
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
      if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

133 134 135 136 137
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
138
    }
139 140 141 142
    
    return;
  }
  
143
  // local merge has handle this situation during super table non-projection query.
H
hjxilinx 已提交
144
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
H
Haojun Liao 已提交
145
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()) {
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

162
  if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
163
    if (pRes->qhandle == 0 && numOfRows != 0) {
H
hzcheng 已提交
164 165 166 167 168
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }

H
Haojun Liao 已提交
169
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
170 171 172 173
    return;
  }

  pSql->fp = fp;
H
hjxilinx 已提交
174
  if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
175 176
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
H
Haojun Liao 已提交
177

H
Haojun Liao 已提交
178 179 180 181 182
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
    tscFetchDatablockFromSubquery(pSql);
  } else {
    tscProcessSql(pSql);
  }
H
hzcheng 已提交
183 184 185
}

/*
186 187 188
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
189
 */
190 191
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
192
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
193 194
}

195 196
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
197
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
H
hzcheng 已提交
198 199
}

H
Haojun Liao 已提交
200
void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
H
hzcheng 已提交
201 202 203
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
204
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
205 206 207 208 209 210
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

211 212 213 214
  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
  pSql->fp = tscAsyncFetchRowsProxy;

H
hzcheng 已提交
215 216
  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
Haojun Liao 已提交
217
    pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
H
Haojun Liao 已提交
218 219
    pSql->param = param;

H
Haojun Liao 已提交
220
    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
221 222 223 224
    return;
  }

  pSql->param = param;
S
slguan 已提交
225
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
226 227
  
  // handle the sub queries of join query
228
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
H
hjxilinx 已提交
229
    tscFetchDatablockFromSubquery(pSql);
230
  } else if (pRes->completed) {
231
    if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
232 233 234 235
      if (hasMoreVnodesToTry(pSql)) {  // sequentially retrieve data from remain vnodes.
        tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      } else {
        /*
236 237
         * all available virtual nodes in current clause has been checked already, now try the
         * next one in the following union subclause
238 239 240 241 242 243 244
         */
        if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
          tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
          return;
        }

        /*
245 246
         * 1. has reach the limitation
         * 2. no remain virtual nodes to be retrieved anymore
247 248 249
         */
        (*pSql->fetchFp)(param, pSql, 0);
      }
250

251
      return;
B
Bomin Zhang 已提交
252
    } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
253
      // in case of show command, return no data
254
      (*pSql->fetchFp)(param, pSql, 0);
255 256
    } else {
      assert(0);
257 258
    }
  } else { // current query is not completed, continue retrieve from node
H
hjxilinx 已提交
259
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
260 261 262 263
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
  
    tscProcessSql(pSql);
H
hzcheng 已提交
264 265 266 267 268 269 270
  }
}

void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
  SSqlObj *pSql = (SSqlObj *)taosa;
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
271
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
272 273 274 275 276 277 278 279
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

  if (pRes->qhandle == 0) {
    tscError("qhandle is NULL");
H
Haojun Liao 已提交
280 281 282 283
    pSql->param = param;
    pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;

    tscQueueAsyncRes(pSql);
H
hzcheng 已提交
284 285 286 287 288
    return;
  }

  pSql->fetchFp = fp;
  pSql->param = param;
289
  
H
hzcheng 已提交
290
  if (pRes->row >= pRes->numOfRows) {
S
slguan 已提交
291
    tscResetForNextRetrieve(pRes);
292
    pSql->fp = tscAsyncFetchSingleRowProxy;
293
    
H
hjxilinx 已提交
294
    if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
295 296 297
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
    
H
hzcheng 已提交
298 299
    tscProcessSql(pSql);
  } else {
B
Bomin Zhang 已提交
300
    SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
301 302 303 304 305 306 307 308
    schedMsg.fp = tscProcessFetchRow;
    schedMsg.ahandle = pSql;
    schedMsg.thandle = pRes->tsrow;
    schedMsg.msg = NULL;
    taosScheduleTask(tscQhandle, &schedMsg);
  }
}

309
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
310 311 312 313
  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

314
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
315
  
H
hzcheng 已提交
316
  if (numOfRows == 0) {
317 318 319
    if (hasMoreVnodesToTry(pSql)) {     // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
    } else {
H
hzcheng 已提交
320
      /*
321 322
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
H
hzcheng 已提交
323 324 325
       */
      (*pSql->fetchFp)(pSql->param, pSql, NULL);
    }
326
    return;
H
hzcheng 已提交
327
  }
328
  
329
  for (int i = 0; i < pCmd->numOfCols; ++i){
H
hjxilinx 已提交
330 331 332
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
    if (pSup->pSqlExpr != NULL) {
//      pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
333 334 335 336 337
    } else {
      //todo add
    }
  }
  
338 339 340
  pRes->row++;

  (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
H
hzcheng 已提交
341 342 343 344 345 346
}

void tscProcessFetchRow(SSchedMsg *pMsg) {
  SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;
347
  
H
hjxilinx 已提交
348
  SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
hzcheng 已提交
349

350
  for (int i = 0; i < pCmd->numOfCols; ++i) {
H
hjxilinx 已提交
351 352 353
    SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);

    if (pSup->pSqlExpr != NULL) {
354
      tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i);
355
    } else {
H
hjxilinx 已提交
356
//      todo add
357
    }
358 359
  }
  
H
hzcheng 已提交
360 361 362 363
  pRes->row++;
  (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}

H
Haojun Liao 已提交
364
// this function will be executed by queue task threads, so the terrno is not valid
H
[td-99]  
hjxilinx 已提交
365
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
366
  void (*fp)() = pMsg->ahandle;
H
Haojun Liao 已提交
367
  terrno = *(int32_t*) pMsg->msg;
H
[td-99]  
hjxilinx 已提交
368
  (*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
H
hzcheng 已提交
369 370
}

H
[td-99]  
hjxilinx 已提交
371 372 373 374
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
B
Bomin Zhang 已提交
375
  SSchedMsg schedMsg = { 0 };
H
hzcheng 已提交
376 377 378
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
379
  schedMsg.msg = c;
H
hzcheng 已提交
380 381 382 383 384
  taosScheduleTask(tscQhandle, &schedMsg);
}

void tscQueueAsyncRes(SSqlObj *pSql) {
  if (pSql == NULL || pSql->signature != pSql) {
385
    tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
H
hzcheng 已提交
386 387 388
    return;
  }

H
Haojun Liao 已提交
389
  tscError("%p add into queued async res, code:%s", pSql, tstrerror(pSql->res.code));
H
hzcheng 已提交
390

H
Haojun Liao 已提交
391 392 393 394 395
  SSqlRes *pRes = &pSql->res;
  assert(pSql->fp != NULL && pSql->fetchFp != NULL);

  pSql->fp = pSql->fetchFp;
  (*pSql->fp)(pSql->param, pSql, pRes->code);
H
hzcheng 已提交
396 397 398 399
}

int tscSendMsgToServer(SSqlObj *pSql);

H
hjxilinx 已提交
400
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
H
hzcheng 已提交
401 402 403 404 405
  SSqlObj *pSql = (SSqlObj *)param;
  if (pSql == NULL || pSql->signature != pSql) return;

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
406
  pRes->code = code;
H
hzcheng 已提交
407

H
Haojun Liao 已提交
408
  const char* msg = (pCmd->command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
H
[td-32]  
hjxilinx 已提交
409
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
410
    tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
H
Haojun Liao 已提交
411 412
    goto _error;
  } else {
413
    tscDebug("%p get %s successfully", pSql, msg);
H
hzcheng 已提交
414 415 416
  }

  if (pSql->pStream == NULL) {
H
hjxilinx 已提交
417
    SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
H
Haojun Liao 已提交
418 419 420

    // check if it is a sub-query of super table query first, if true, enter another routine
    if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY)) {
421
      tscDebug("%p update table meta in local cache, continue to process sql and send corresponding subquery", pSql);
H
Haojun Liao 已提交
422

H
hjxilinx 已提交
423
      STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
dengyihao's avatar
dengyihao 已提交
424 425 426 427 428
      code = tscGetTableMeta(pSql, pTableMetaInfo);
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        return;
      } else {
        assert(code == TSDB_CODE_SUCCESS);      
H
Haojun Liao 已提交
429
      }
dengyihao's avatar
dengyihao 已提交
430
     
431
      assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pSql->param != NULL);
H
hzcheng 已提交
432 433

      SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
434
      SSqlObj *         pParObj = trs->pParentSql;
435
      
436
      // NOTE: the vgroupInfo for the queried super table must be existed here.
H
hjxilinx 已提交
437
      assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
438
          pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
S
slguan 已提交
439

H
Haojun Liao 已提交
440 441 442
      // tscProcessSql can add error into async res
      tscProcessSql(pSql);
      return;
H
Haojun Liao 已提交
443
    } else {  // continue to process normal async query
444
      if (pCmd->parseFinished) {
445
        tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
H
Haojun Liao 已提交
446

447
        STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hjxilinx 已提交
448
        code = tscGetTableMeta(pSql, pTableMetaInfo);
dengyihao's avatar
dengyihao 已提交
449 450 451 452 453
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
          return;
        } else {
          assert(code == TSDB_CODE_SUCCESS);      
        }
H
Haojun Liao 已提交
454

455
        // in case of insert, redo parsing the sql string and build new submit data block for two reasons:
456
        // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
457
        // 2. vnode may need the schema information along with submit block to update its local table schema.
H
Haojun Liao 已提交
458
        if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
459 460
          tscDebug("%p redo parse sql string and proceed", pSql);
          pCmd->parseFinished = false;
B
Bomin Zhang 已提交
461
          tscResetSqlCmdObj(pCmd, false);
462 463 464 465 466 467 468 469 470

          code = tsParseSql(pSql, true);

          if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
            return;
          } else if (code != TSDB_CODE_SUCCESS) {
            goto _error;
          }

H
Haojun Liao 已提交
471 472 473 474 475 476 477 478 479 480 481
          if (pCmd->command == TSDB_SQL_INSERT) {
            /*
             * Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
             * and send the required submit block according to index value in supporter to server.
             */
            pSql->fp = pSql->fetchFp;  // restore the fp
            tscHandleInsertRetry(pSql);
          } else if (pCmd->command == TSDB_SQL_SELECT) {  // in case of other query type, continue
            tscProcessSql(pSql);
          }
        }else {  // in all other cases, simple retry
H
Haojun Liao 已提交
482
          tscProcessSql(pSql);
L
[#1083]  
lihui 已提交
483
        }
484

H
Haojun Liao 已提交
485
        return;
L
[#1083]  
lihui 已提交
486
      } else {
487
        tscDebug("%p continue parse sql after get table meta", pSql);
H
Haojun Liao 已提交
488

H
hjxilinx 已提交
489
        code = tsParseSql(pSql, false);
490 491 492 493 494 495
        if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
          return;
        } else if (code != TSDB_CODE_SUCCESS) {
          goto _error;
        }

H
Haojun Liao 已提交
496
        if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
H
Hui Li 已提交
497 498
          STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
          code = tscGetTableMeta(pSql, pTableMetaInfo);
dengyihao's avatar
dengyihao 已提交
499 500 501 502 503
          if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
            return;
          } else {
            assert(code == TSDB_CODE_SUCCESS);      
          }
504

H
Hui Li 已提交
505
          (*pSql->fp)(pSql->param, pSql, code);
H
Hui Li 已提交
506 507
          return;
        }
508 509

        // proceed to invoke the tscDoQuery();
L
[#1083]  
lihui 已提交
510
      }
H
hzcheng 已提交
511
    }
S
slguan 已提交
512

H
hzcheng 已提交
513
  } else {  // stream computing
514
    STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
H
hzcheng 已提交
515

516 517 518 519 520 521
    code = tscGetTableMeta(pSql, pTableMetaInfo);
    if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
      return;
    } else if (code != TSDB_CODE_SUCCESS) {
      goto _error;
    }
H
hzcheng 已提交
522

H
Haojun Liao 已提交
523
    if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
H
hjxilinx 已提交
524
      code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
525 526 527 528 529
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        return;
      } else if (code != TSDB_CODE_SUCCESS) {
        goto _error;
      }
H
hzcheng 已提交
530 531
    }

532
    tscDebug("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
533 534 535
    if (!pSql->cmd.parseFinished) {
      tsParseSql(pSql, false);
    }
B
Bomin Zhang 已提交
536
    (*pSql->fp)(pSql->param, pSql, code);
537

538
    return;
H
hzcheng 已提交
539 540 541
  }

  tscDoQuery(pSql);
542 543 544 545 546 547 548
  return;

  _error:
  if (code != TSDB_CODE_SUCCESS) {
    pSql->res.code = code;
    tscQueueAsyncRes(pSql);
  }
H
hzcheng 已提交
549
}