executor.c 34.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
H
Haojun Liao 已提交
17
#include "executorimpl.h"
18
#include "planner.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "tudf.h"
L
Liu Jicong 已提交
22
#include "vnode.h"
23 24 25 26 27 28 29 30 31

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
32

L
Liu Jicong 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  ASSERT(pOperator != NULL);
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_QRY_APP_ERROR;
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
        taosArrayPush(pInfo->pBlockLists, &pDataBlock);
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
73
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
H
Haojun Liao 已提交
74
  ASSERT(pOperator != NULL);
X
Xiaoyu Wang 已提交
75
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
76
    if (pOperator->numOfDownstream == 0) {
77
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
H
Haojun Liao 已提交
78 79
      return TSDB_CODE_QRY_APP_ERROR;
    }
H
Haojun Liao 已提交
80

H
Haojun Liao 已提交
81
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
82
      qError("join not supported for stream block scan, %s" PRIx64, id);
H
Haojun Liao 已提交
83
      return TSDB_CODE_QRY_APP_ERROR;
H
Haojun Liao 已提交
84
    }
L
Liu Jicong 已提交
85
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
86
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
87
  } else {
88 89
    pOperator->status = OP_NOT_OPENED;

90
    SStreamScanInfo* pInfo = pOperator->info;
91

L
Liu Jicong 已提交
92 93
    ASSERT(pInfo->validBlockIndex == 0);
    ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
94

L
Liu Jicong 已提交
95
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
C
Cary Xu 已提交
96
      // ASSERT(numOfBlocks > 1);
L
Liu Jicong 已提交
97 98 99
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
100
      }
L
Liu Jicong 已提交
101 102 103 104 105
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      ASSERT(numOfBlocks == 1);
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
106
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
107
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
108
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
L
Liu Jicong 已提交
109
        taosArrayPush(pInfo->pBlockLists, &pDataBlock);
H
Haojun Liao 已提交
110
      }
L
Liu Jicong 已提交
111
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
112 113
    } else {
      ASSERT(0);
114 115
    }

H
Haojun Liao 已提交
116 117 118 119
    return TSDB_CODE_SUCCESS;
  }
}

120 121
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }

L
Liu Jicong 已提交
122
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
123 124 125 126
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

H
Haojun Liao 已提交
127
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
128 129 130
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
131
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
132

C
Cary Xu 已提交
133
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
134
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
135
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
136
  } else {
H
Haojun Liao 已提交
137
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
138 139 140 141 142
  }

  return code;
}

L
Liu Jicong 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

H
Haojun Liao 已提交
164
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
165
  if (msg == NULL) {
L
Liu Jicong 已提交
166
    // create raw scan
167 168 169 170 171 172 173 174 175 176 177

    SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);

    pTaskInfo->cost.created = taosGetTimestampMs();
    pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
    pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
L
Liu Jicong 已提交
178
    if (NULL == pTaskInfo->pRoot) {
179 180 181 182 183
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
    return pTaskInfo;
L
Liu Jicong 已提交
184 185
  }

H
Haojun Liao 已提交
186 187
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
188 189 190 191 192 193
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
194
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
195
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
196 197
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
198 199 200 201
    terrno = code;
    return NULL;
  }

202
  // extract the number of output columns
H
Haojun Liao 已提交
203
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
204
  *numOfCols = 0;
205

L
Liu Jicong 已提交
206
  SNode* pNode;
207 208 209
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
210
      ++(*numOfCols);
211 212 213
    }
  }

H
Haojun Liao 已提交
214
  *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
L
Liu Jicong 已提交
215 216 217
  return pTaskInfo;
}

L
Liu Jicong 已提交
218
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
L
Liu Jicong 已提交
219
  if (msg == NULL) {
220 221 222
    return NULL;
  }

223
  /*qDebugL("stream task string %s", (const char*)msg);*/
X
bugfix  
Xiaoyu Wang 已提交
224

H
Haojun Liao 已提交
225 226
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
227 228 229 230 231 232
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
233
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
234
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
235 236
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
237 238 239 240 241 242
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
243

244
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
245 246 247 248 249 250
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
251
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
252 253 254

    int32_t code = metaGetTableEntryByUid(&mr, *id);
    if (code != TSDB_CODE_SUCCESS) {
255
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
256 257 258
      continue;
    }

M
Minglei Jin 已提交
259 260
    tDecoderClear(&mr.coder);

261
    // TODO handle ntb case
L
Liu Jicong 已提交
262
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
263 264
      continue;
    }
265 266 267

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
268
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
269
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
270 271 272 273 274 275 276 277 278 279
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

280
    // handle multiple partition
281 282 283 284 285 286 287
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

288
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
L
Liu Jicong 已提交
289
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
290 291 292
  STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;

  if (isAdd) {
L
Liu Jicong 已提交
293
    qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
H
Haojun Liao 已提交
294 295
  }

296
  // traverse to the stream scanner node to add this table id
297
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
298
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
299 300 301
    pInfo = pInfo->pDownstream[0];
  }

302 303
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
304
  if (isAdd) {  // add new table id
305
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
306

307
    qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
L
Liu Jicong 已提交
308
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
309
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
310
      taosArrayDestroy(qa);
311 312 313 314
      return code;
    }

    // todo refactor STableList
M
Minglei Jin 已提交
315
    bool   assignUid = false;
L
Liu Jicong 已提交
316 317
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
318
    if (bufLen > 0) {
319
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
320 321
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
322
        taosArrayDestroy(qa);
323 324 325
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
326

327
    for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
328
      uint64_t*     uid = taosArrayGet(qa, i);
329
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
330 331

      if (bufLen > 0) {
332 333 334 335 336 337
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
                                       &keyInfo.groupId);
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
338
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
339
            taosArrayDestroy(qa);
340 341
            return code;
          }
342 343 344
        }
      }

L
Liu Jicong 已提交
345
#if 0
346
      bool exists = false;
H
Haojun Liao 已提交
347 348 349 350 351 352
      for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
        STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
        if (pKeyInfo->uid == keyInfo.uid) {
          qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
          exists = true;
        }
353 354
      }

H
Haojun Liao 已提交
355
      if (!exists) {
356
#endif
H
Haojun Liao 已提交
357 358

      addTableIntoTableList(&pTaskInfo->tableqinfoList, keyInfo.uid, keyInfo.groupId);
359 360
    }

361 362 363 364
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

365 366 367
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
368
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
369 370
  }

371
  return code;
L
fix  
Liu Jicong 已提交
372
}
373

374
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
375
                                    int32_t* tversion) {
376
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
377
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
378

379
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
380 381 382
    return TSDB_CODE_SUCCESS;
  }

383 384 385 386
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
387 388 389
  } else {
    dbName[0] = 0;
  }
390 391
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
392 393 394
  } else {
    tableName[0] = 0;
  }
395 396

  return 0;
397
}
398 399

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
400
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
401 402 403 404 405 406
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);
  atexit(cleanupRefPool);

407
  qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
408

409 410
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
  if (code != TSDB_CODE_SUCCESS) {
411
    qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code));
412 413 414
    goto _error;
  }

415
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
416 417
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
418
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
419 420 421 422 423 424 425
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
426
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
427 428 429
      goto _error;
    }

H
Haojun Liao 已提交
430
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
L
Liu Jicong 已提交
431
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
432 433
      taosMemoryFreeClear(pSinkParam);
    }
434 435
  }

436
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
437

L
Liu Jicong 已提交
438
_error:
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if (pcnt) return 0;

    char* pos = strstr(pQInfo->sql, " t_");
    if (pos) {
      pos += 3;
      ms = atoi(pos);
      while (*pos >= '0' && *pos <= '9') {
        pos++;
      }
      char unit_char = *pos;
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
        ms *= 1000;
      }
    }
    if (ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);

    if (ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while (used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
        if (isTaskKilled(pQInfo)) {
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

H
Haojun Liao 已提交
488
static void freeBlock(void* param) {
489
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
490 491 492
  blockDataDestroy(pBlock);
}

493
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
494 495 496
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
497
  if (pLocal) {
498
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
499
  }
L
Liu Jicong 已提交
500

H
Haojun Liao 已提交
501 502
  taosArrayClearEx(pResList, freeBlock);

503 504 505 506 507 508 509 510 511 512 513 514
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
515
    atomic_store_64(&pTaskInfo->owner, 0);
516 517 518 519 520 521 522 523 524
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
525

526 527 528 529 530 531 532 533
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

534
  int32_t      current = 0;
H
Haojun Liao 已提交
535 536
  SSDataBlock* pRes = NULL;

537 538
  int64_t st = taosGetTimestampUs();

539
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
540 541 542 543 544 545 546 547 548 549
    SSDataBlock* p = createOneDataBlock(pRes, true);
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

550
  *hasMore = (pRes != NULL);
551 552 553
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
554
  if (NULL == pRes) {
555 556 557 558 559
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
560
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
561
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
562
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
585
    atomic_store_64(&pTaskInfo->owner, 0);
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;

  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo);
  return TSDB_CODE_SUCCESS;
}

void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

  queryCostStatis(pTaskInfo);  // print the query cost summary
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
648
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
649
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
650
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
651 652 653 654 655 656 657 658 659 660
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
H
Haojun Liao 已提交
661
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
683
    uint16_t type = pOperator->operatorType;
684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

703
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
704 705
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
706 707 708 709 710 711 712 713 714
  pTaskInfo->streamInfo.recoverStartVer = 0;
  pTaskInfo->streamInfo.recoverEndVer = ver;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
  return 0;
}

int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
715
  pTaskInfo->streamInfo.recoverStartVer = pTaskInfo->streamInfo.recoverEndVer;
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
  pTaskInfo->streamInfo.recoverEndVer = ver;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
      pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
      pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
      pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
      pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
      pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
      pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;

      pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
      pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

      pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
      pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;

      pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
      pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
811 812 813 814 815 816 817 818
  return 0;
}

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
819 820 821 822 823 824 825 826
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
827 828
}

wmmhello's avatar
wmmhello 已提交
829
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
830 831
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
wmmhello's avatar
wmmhello 已提交
832
  return &pTaskInfo->streamInfo.metaRsp;
833 834
}

wmmhello's avatar
wmmhello 已提交
835 836 837 838 839 840
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  return pTaskInfo->streamInfo.prepareStatus.uid;
}

841 842 843 844 845 846 847
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

H
Haojun Liao 已提交
848
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
849 850
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
851
  pCond->numOfCols = pMtInfo->schema->nCols;
852 853 854 855 856 857 858
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Haojun Liao 已提交
859
  pCond->suid = pMtInfo->suid;
860 861 862 863 864
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
865 866 867
    pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
    pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
    pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
868 869 870 871 872
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
873 874 875 876 877 878 879 880
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  ASSERT(pTaskInfo->streamInfo.pReq == NULL);
  pTaskInfo->streamInfo.pReq = pReq;
  return 0;
}

881
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
882 883 884 885
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
886
  pTaskInfo->streamInfo.returned = 0;
887 888 889 890
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    return 0;
  }
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
wmmhello's avatar
wmmhello 已提交
891
    uint16_t type = pOperator->operatorType;
892 893 894 895 896 897
    pOperator->status = OP_OPENED;
    // TODO add more check
    if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
L
Liu Jicong 已提交
898

899 900 901 902 903
    SStreamScanInfo* pInfo = pOperator->info;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
      tsdbReaderClose(pTSInfo->dataReader);
      pTSInfo->dataReader = NULL;
904
#if 0
wmmhello's avatar
wmmhello 已提交
905 906 907 908 909 910
      if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
          pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
        qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
               pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
        ASSERT(0);
      }
911
#endif
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926
      if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
        return -1;
      }
      ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;

      if (uid == 0) {
        if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
          STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
          uid = pTableInfo->uid;
          ts = INT64_MIN;
        } else {
L
Liu Jicong 已提交
927 928
          return -1;
        }
929
      }
H
Haojun Liao 已提交
930

931 932 933
      /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
      /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
934
      int32_t         tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
H
Haojun Liao 已提交
935 936

#ifndef NDEBUG
wmmhello's avatar
wmmhello 已提交
937 938
      qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
             pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
939
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
940 941
#endif

942
      bool found = false;
943
      for (int32_t i = 0; i < tableSz; i++) {
944 945 946 947 948
        STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
        if (pTableInfo->uid == uid) {
          found = true;
          pTableScanInfo->currentTable = i;
          break;
L
Liu Jicong 已提交
949
        }
950
      }
951

L
Liu Jicong 已提交
952
      // TODO after dropping table, table may not found
953
      ASSERT(found);
954

955
      if (pTableScanInfo->dataReader == NULL) {
956 957 958
        if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
                           pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
            pTableScanInfo->dataReader == NULL) {
959
          ASSERT(0);
L
Liu Jicong 已提交
960
        }
961
      }
H
Haojun Liao 已提交
962

963
      tsdbSetTableId(pTableScanInfo->dataReader, uid);
964 965 966 967 968
      int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
      pTableScanInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
      pTableScanInfo->cond.twindows.skey = oldSkey;
      pTableScanInfo->scanTimes = 0;
969

wmmhello's avatar
wmmhello 已提交
970
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
971
             ts, pTableScanInfo->currentTable, tableSz);
972 973 974
      /*}*/
    } else {
      ASSERT(0);
975
    }
L
Liu Jicong 已提交
976
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
977
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
978 979 980
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
      qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
981 982 983 984 985 986 987 988
      return -1;
    }

    SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
    taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
L
Liu Jicong 已提交
989
    if (mtInfo.uid == 0) return 0;  // no data
990

H
Haojun Liao 已提交
991
    initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
992
    pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
993 994 995
    pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
    taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
    tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
L
Liu Jicong 已提交
996
                   &pInfo->dataReader, NULL);
997

998
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
L
Liu Jicong 已提交
999 1000 1001 1002
    strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
    tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
    pTaskInfo->streamInfo.schema = mtInfo.schema;

1003
    qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
L
Liu Jicong 已提交
1004
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
wmmhello's avatar
wmmhello 已提交
1005
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1006 1007
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
H
Haojun Liao 已提交
1008
      qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
wmmhello's avatar
wmmhello 已提交
1009 1010
      return -1;
    }
1011
    qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
L
Liu Jicong 已提交
1012
  } else if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
1013 1014 1015
    SStreamRawScanInfo* pInfo = pOperator->info;
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
wmmhello's avatar
wmmhello 已提交
1016
    qDebug("tmqsnap qStreamPrepareScan snapshot log");
1017 1018 1019
  }
  return 0;
}