tscAsync.c 14.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "os.h"
H
hjxilinx 已提交
17 18 19
#include "tutil.h"

#include "tnote.h"
H
hzcheng 已提交
20
#include "trpc.h"
S
slguan 已提交
21
#include "tscLog.h"
H
hjxilinx 已提交
22
#include "tscSubquery.h"
H
hzcheng 已提交
23
#include "tscUtil.h"
S
slguan 已提交
24
#include "tsched.h"
H
Haojun Liao 已提交
25
#include "qTableMeta.h"
H
hjxilinx 已提交
26
#include "tsclient.h"
H
hzcheng 已提交
27

28
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
29 30

/*
31 32 33
 * Proxy function to perform sequentially query&retrieve operation.
 * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
 * query), it will sequentially query&retrieve data for all vnodes
H
hzcheng 已提交
34
 */
35
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
H
hzcheng 已提交
36

H
Haojun Liao 已提交
37
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
H
Haojun Liao 已提交
38 39
  SSqlCmd* pCmd = &pSql->cmd;

H
hzcheng 已提交
40
  pSql->signature = pSql;
H
hjxilinx 已提交
41 42
  pSql->param     = param;
  pSql->pTscObj   = pObj;
43
  pSql->parseRetry= 0;
44
  pSql->maxRetry  = TSDB_MAX_REPLICA;
45
  pSql->fp        = fp;
H
Haojun Liao 已提交
46
  pSql->fetchFp   = fp;
W
wpan 已提交
47
  pSql->rootObj   = pSql;
H
Haojun Liao 已提交
48

49
  registerSqlObj(pSql);
H
Haojun Liao 已提交
50

H
Haojun Liao 已提交
51
  pSql->sqlstr = calloc(1, sqlLen + 1);
H
hzcheng 已提交
52
  if (pSql->sqlstr == NULL) {
D
dapan1121 已提交
53
    tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
H
Haojun Liao 已提交
54
    pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
H
Haojun Liao 已提交
55
    tscAsyncResultOnError(pSql);
H
hzcheng 已提交
56 57
    return;
  }
H
Haojun Liao 已提交
58

S
Shengliang Guan 已提交
59
  strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
H
Haojun Liao 已提交
60

H
Haojun Liao 已提交
61
  tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
H
Haojun Liao 已提交
62
  pCmd->resColumnId = TSDB_RES_COL_ID;
H
Haojun Liao 已提交
63

64 65
  taosAcquireRef(tscObjRef, pSql->self);

H
Haojun Liao 已提交
66
  int32_t code = tsParseSql(pSql, true);
67 68 69 70 71

  if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
    taosReleaseRef(tscObjRef, pSql->self);
    return;
  }
72
  
H
hzcheng 已提交
73
  if (code != TSDB_CODE_SUCCESS) {
H
[td-99]  
hjxilinx 已提交
74
    pSql->res.code = code;
H
Haojun Liao 已提交
75
    tscAsyncResultOnError(pSql);
76
    taosReleaseRef(tscObjRef, pSql->self);
H
hzcheng 已提交
77 78
    return;
  }
H
Haojun Liao 已提交
79

H
Haojun Liao 已提交
80
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
81
  executeQuery(pSql, pQueryInfo);
82
  taosReleaseRef(tscObjRef, pSql->self);
H
hzcheng 已提交
83 84
}

85
// TODO return the correct error code to client in tscQueueAsyncError
H
hjxilinx 已提交
86
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
D
dapan1121 已提交
87 88 89 90
  taos_query_ra(taos, sqlstr, fp, param);
}

TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
91 92
  STscObj *pObj = (STscObj *)taos;
  if (pObj == NULL || pObj->signature != pObj) {
D
dapan1121 已提交
93
    tscError("pObj:%p is NULL or freed", pObj);
94 95
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
D
dapan1121 已提交
96
    return NULL;
97 98
  }
  
S
Shengliang Guan 已提交
99
  int32_t sqlLen = (int32_t)strlen(sqlstr);
100
  if (sqlLen > tsMaxSQLStringLen) {
H
Haojun Liao 已提交
101
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
102 103
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    tscQueueAsyncError(fp, param, terrno);
D
dapan1121 已提交
104
    return NULL;
105 106
  }
  
107
  nPrintTsc("%s", sqlstr);
108 109 110 111
  
  SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
  if (pSql == NULL) {
    tscError("failed to malloc sqlObj");
112
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
D
dapan1121 已提交
113
    return NULL;
114 115 116
  }
  
  doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
D
dapan1121 已提交
117 118

  return pSql;
119 120
}

D
dapan1121 已提交
121

122
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
H
hzcheng 已提交
123 124 125 126 127 128 129 130
  if (tres == NULL) {
    return;
  }

  SSqlObj *pSql = (SSqlObj *)tres;
  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

131 132 133 134
  if (numOfRows == 0) {
    if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
      tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
    } else {
135 136 137 138
      /*
       * all available virtual node has been checked already, now we need to check
       * for the next subclause queries
       */
H
Haojun Liao 已提交
139 140
      if (pCmd->active->sibling != NULL) {
        pCmd->active = pCmd->active->sibling;
141 142 143 144
        tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
        return;
      }

145 146 147 148 149
      /*
       * 1. has reach the limitation
       * 2. no remain virtual nodes to be retrieved anymore
       */
      (*pSql->fetchFp)(param, pSql, 0);
H
hzcheng 已提交
150
    }
151 152 153 154
    
    return;
  }
  
155
  // local merge has handle this situation during super table non-projection query.
156
  if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) {
H
Haojun Liao 已提交
157
    pRes->numOfClauseTotal += pRes->numOfRows;
H
hzcheng 已提交
158 159 160 161 162 163
  }

  (*pSql->fetchFp)(param, tres, numOfRows);
}

// actual continue retrieve function with user-specified callback function
H
Haojun Liao 已提交
164
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, __async_cb_func_t fp) {
H
hzcheng 已提交
165 166 167 168 169 170 171 172 173
  SSqlObj *pSql = (SSqlObj *)tres;
  if (pSql == NULL) {  // error
    tscError("sql object is NULL");
    return;
  }

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;

H
Haojun Liao 已提交
174 175
  if ((pRes->qId == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
    if (pRes->qId == 0 && numOfRows != 0) {
H
hzcheng 已提交
176 177 178 179
      tscError("qhandle is NULL");
    } else {
      pRes->code = numOfRows;
    }
180 181 182
    if (pRes->code == TSDB_CODE_SUCCESS) {
      pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;           
    }
H
hzcheng 已提交
183

H
Haojun Liao 已提交
184
    tscAsyncResultOnError(pSql);
H
hzcheng 已提交
185 186 187 188
    return;
  }

  pSql->fp = fp;
189
  if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hzcheng 已提交
190 191
    pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
  }
H
Haojun Liao 已提交
192

H
Haojun Liao 已提交
193
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
194
    tscFetchDatablockForSubquery(pSql);
H
Haojun Liao 已提交
195
  } else {
196
    tscBuildAndSendRequest(pSql, NULL);
H
Haojun Liao 已提交
197
  }
H
hzcheng 已提交
198 199 200
}

/*
201 202 203
 * retrieve callback for fetch rows proxy.
 * The below two functions both serve as the callback function of query virtual node.
 * query callback first, and then followed by retrieve callback
H
hzcheng 已提交
204
 */
205 206
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
  // query completed, continue to retrieve
207
  tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
H
hzcheng 已提交
208 209
}

H
Haojun Liao 已提交
210 211
void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
  SSqlObj *pSql = (SSqlObj *)tres;
H
hzcheng 已提交
212 213
  if (pSql == NULL || pSql->signature != pSql) {
    tscError("sql object is NULL");
214
    tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
H
hzcheng 已提交
215 216 217 218 219 220
    return;
  }

  SSqlRes *pRes = &pSql->res;
  SSqlCmd *pCmd = &pSql->cmd;

221 222
  // user-defined callback function is stored in fetchFp
  pSql->fetchFp = fp;
H
Haojun Liao 已提交
223 224
  pSql->fp      = tscAsyncFetchRowsProxy;
  pSql->param   = param;
225

S
slguan 已提交
226
  tscResetForNextRetrieve(pRes);
H
hjxilinx 已提交
227
  
H
Haojun Liao 已提交
228
  // handle outer query based on the already retrieved nest query results.
H
Haojun Liao 已提交
229 230 231
  SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
  if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
    SSchedMsg schedMsg = {0};
H
Haojun Liao 已提交
232
    schedMsg.fp      = doRetrieveSubqueryData;
233
    schedMsg.ahandle = (void *)pSql;
H
Haojun Liao 已提交
234
    schedMsg.thandle = (void *)1;
H
Haojun Liao 已提交
235
    schedMsg.msg     = 0;
H
Haojun Liao 已提交
236 237 238 239
    taosScheduleTask(tscQhandle, &schedMsg);
    return;
  }

240
  if (pRes->qId == 0 && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
H
Haojun Liao 已提交
241 242 243 244 245 246
    tscError("qhandle is invalid");
    pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
    tscAsyncResultOnError(pSql);
    return;
  }

247
  if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
248
    tscFetchDatablockForSubquery(pSql);
249
  } else if (pRes->completed) {
250
    if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
251 252 253 254
      if (hasMoreVnodesToTry(pSql)) {  // sequentially retrieve data from remain vnodes.
        tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
      } else {
        /*
255 256
         * all available virtual nodes in current clause has been checked already, now try the
         * next one in the following union subclause
257
         */
H
Haojun Liao 已提交
258 259
        if (pCmd->active->sibling != NULL) {
          pCmd->active = pCmd->active->sibling;  // todo refactor
260 261 262 263 264
          tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
          return;
        }

        /*
265 266
         * 1. has reach the limitation
         * 2. no remain virtual nodes to be retrieved anymore
267 268 269
         */
        (*pSql->fetchFp)(param, pSql, 0);
      }
270

271
      return;
272
    } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
273
      // in case of show command, return no data
274
      (*pSql->fetchFp)(param, pSql, 0);
275 276
    } else {
      assert(0);
277 278
    }
  } else { // current query is not completed, continue retrieve from node
279
    if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
H
hjxilinx 已提交
280 281
      pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
    }
H
Haojun Liao 已提交
282

H
Haojun Liao 已提交
283
    SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
284
    tscBuildAndSendRequest(pSql, pQueryInfo1);
H
hzcheng 已提交
285 286 287
  }
}

H
Haojun Liao 已提交
288
// this function will be executed by queue task threads, so the terrno is not valid
H
[td-99]  
hjxilinx 已提交
289
static void tscProcessAsyncError(SSchedMsg *pMsg) {
H
hzcheng 已提交
290
  void (*fp)() = pMsg->ahandle;
H
Haojun Liao 已提交
291
  terrno = *(int32_t*) pMsg->msg;
Y
yihaoDeng 已提交
292
  tfree(pMsg->msg);
Y
TD-2662  
yihaoDeng 已提交
293
  (*fp)(pMsg->thandle, NULL, terrno);
H
hzcheng 已提交
294 295
}

H
[td-99]  
hjxilinx 已提交
296 297 298 299
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
  int32_t* c = malloc(sizeof(int32_t));
  *c = code;
  
H
Haojun Liao 已提交
300
  SSchedMsg schedMsg = {0};
H
hzcheng 已提交
301 302 303
  schedMsg.fp = tscProcessAsyncError;
  schedMsg.ahandle = fp;
  schedMsg.thandle = param;
H
[td-99]  
hjxilinx 已提交
304
  schedMsg.msg = c;
H
hzcheng 已提交
305 306 307
  taosScheduleTask(tscQhandle, &schedMsg);
}

D
fix bug  
dapan1121 已提交
308
static void tscAsyncResultCallback(SSchedMsg *pMsg) {
D
fix bug  
dapan1121 已提交
309
  SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle);
H
hzcheng 已提交
310
  if (pSql == NULL || pSql->signature != pSql) {
311
    tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle);
H
hzcheng 已提交
312 313 314
    return;
  }

315
  assert(pSql->res.code != TSDB_CODE_SUCCESS);
316 317 318 319 320 321
  if (tsShortcutFlag) {
    tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
    pSql->res.code = TSDB_CODE_SUCCESS;
  } else {
    tscError("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
  }
H
hzcheng 已提交
322

H
Haojun Liao 已提交
323
  SSqlRes *pRes = &pSql->res;
D
fix bug  
dapan1121 已提交
324
  if (pSql->fp == NULL || pSql->fetchFp == NULL){
D
fix bug  
dapan1121 已提交
325
    taosReleaseRef(tscObjRef, pSql->self);
D
fix bug  
dapan1121 已提交
326 327
    return;
  }
H
Haojun Liao 已提交
328 329 330

  pSql->fp = pSql->fetchFp;
  (*pSql->fp)(pSql->param, pSql, pRes->code);
D
fix bug  
dapan1121 已提交
331
  taosReleaseRef(tscObjRef, pSql->self);
H
hzcheng 已提交
332 333
}

H
Haojun Liao 已提交
334
void tscAsyncResultOnError(SSqlObj* pSql) {
D
fix bug  
dapan1121 已提交
335 336
  SSchedMsg schedMsg = {0};
  schedMsg.fp = tscAsyncResultCallback;
D
fix bug  
dapan1121 已提交
337
  schedMsg.ahandle = (void *)pSql->self;
D
fix bug  
dapan1121 已提交
338 339 340 341 342
  schedMsg.thandle = (void *)1;
  schedMsg.msg = 0;
  taosScheduleTask(tscQhandle, &schedMsg);
}

343
int tscSendMsgToServer(SSqlObj *pSql);
344
void tscClearTableMeta(SSqlObj *pSql);
D
fix bug  
dapan1121 已提交
345

X
xywang 已提交
346 347 348 349
static void freeElem(void* p) {
  tfree(*(char**)p);
}

H
hjxilinx 已提交
350
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
D
TD-2516  
dapan1121 已提交
351 352 353 354
  SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
  if (pSql == NULL) return;

  assert(pSql->signature == pSql && (int64_t)param == pSql->self);
H
hzcheng 已提交
355 356 357

  SSqlCmd *pCmd = &pSql->cmd;
  SSqlRes *pRes = &pSql->res;
H
Haojun Liao 已提交
358
  pRes->code = code;
H
hzcheng 已提交
359

H
Haojun Liao 已提交
360
  SSqlObj *sub = (SSqlObj*) res;
361
  const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"multi-tableMeta";
H
[td-32]  
hjxilinx 已提交
362
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
363
    tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code));
364 365 366 367
    if (code == TSDB_CODE_RPC_FQDN_ERROR) {
      size_t sz = strlen(tscGetErrorMsgPayload(&sub->cmd));
      tscAllocPayload(&pSql->cmd, (int)sz + 1); 
      memcpy(tscGetErrorMsgPayload(&pSql->cmd), tscGetErrorMsgPayload(&sub->cmd), sz);
368
    } else if (code == TSDB_CODE_MND_INVALID_TABLE_NAME) {
X
xywang 已提交
369
      if (pSql->cmd.command == TSDB_SQL_MULTI_META && pSql->cmd.hashedTableNames) {
370
        tscClearTableMeta(pSql);
X
xywang 已提交
371 372
        taosArrayDestroyEx(&pSql->cmd.hashedTableNames, freeElem);
        pSql->cmd.hashedTableNames = NULL;
373 374
      }
    }
H
Haojun Liao 已提交
375
    goto _error;
H
hzcheng 已提交
376 377
  }

X
xywang 已提交
378 379 380 381 382 383 384
  if (pSql->cmd.command == TSDB_SQL_MULTI_META) {
    if (pSql->cmd.hashedTableNames) {
      taosArrayDestroyEx(&pSql->cmd.hashedTableNames, freeElem);
      pSql->cmd.hashedTableNames = NULL;
    }
  }

H
Haojun Liao 已提交
385
  tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
H
hzcheng 已提交
386
  if (pSql->pStream == NULL) {
H
Haojun Liao 已提交
387
    SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
388

389
    if (pQueryInfo != NULL && TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
390
      tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
391

392 393 394 395 396 397 398
      code = tsParseSql(pSql, false);
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        taosReleaseRef(tscObjRef, pSql->self);
        return;
      } else if (code != TSDB_CODE_SUCCESS) {
        goto _error;
      }
H
Haojun Liao 已提交
399

400 401 402 403 404 405 406 407 408 409 410 411 412 413
      if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {  // stmt insert
        (*pSql->fp)(pSql->param, pSql, code);
      } else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
        tscImportDataFromFile(pSql);
      } else {  // sql string insert
        tscHandleMultivnodeInsert(pSql);
      }
    } else {
      if (pSql->retryReason != TSDB_CODE_SUCCESS) {
        tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self);
        pSql->retryReason = TSDB_CODE_SUCCESS;
      } else {
        tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
      }
H
Haojun Liao 已提交
414

415 416 417 418 419 420
      code = tsParseSql(pSql, true);
      if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
        taosReleaseRef(tscObjRef, pSql->self);
        return;
      } else if (code != TSDB_CODE_SUCCESS) {
        goto _error;
L
[#1083]  
lihui 已提交
421
      }
S
slguan 已提交
422

423 424 425 426 427 428
      SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
      executeQuery(pSql, pQueryInfo1);
    }

    taosReleaseRef(tscObjRef, pSql->self);
    return;
H
hzcheng 已提交
429
  } else {  // stream computing
430
    tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
H
hzcheng 已提交
431

432
    SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
H
Haojun Liao 已提交
433
    if (tscNumOfExprs(pQueryInfo) == 0) {
434 435
      tsParseSql(pSql, false);
    }
436

B
Bomin Zhang 已提交
437
    (*pSql->fp)(pSql->param, pSql, code);
438
    taosReleaseRef(tscObjRef, pSql->self);
439
    return;
H
hzcheng 已提交
440 441
  }

442
  _error:
H
Haojun Liao 已提交
443 444
  pRes->code = code;
  tscAsyncResultOnError(pSql);
D
TD-2516  
dapan1121 已提交
445
  taosReleaseRef(tscObjRef, pSql->self);
H
hzcheng 已提交
446
}
447 448 449 450 451 452 453 454 455 456

void tscClearTableMeta(SSqlObj *pSql) {
  SSqlCmd* pCmd = &pSql->cmd;

  int32_t n = taosArrayGetSize(pCmd->hashedTableNames);
  for (int32_t i = 0; i < n; i++) {
    char *t = taosArrayGetP(pCmd->hashedTableNames, i);
    taosHashRemove(UTIL_GET_TABLEMETA(pSql), t, strlen(t));
  }
}