executor.c 34.8 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
H
Haojun Liao 已提交
17
#include "executorimpl.h"
18
#include "planner.h"
L
Liu Jicong 已提交
19
#include "tdatablock.h"
L
Liu Jicong 已提交
20
#include "tref.h"
21
#include "tudf.h"
L
Liu Jicong 已提交
22
#include "vnode.h"
23 24 25 26 27 28 29 30 31

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
32

L
Liu Jicong 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  ASSERT(pOperator != NULL);
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_QRY_APP_ERROR;
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_QRY_APP_ERROR;
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
        taosArrayPush(pInfo->pBlockLists, &pDataBlock);
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
73
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
H
Haojun Liao 已提交
74
  ASSERT(pOperator != NULL);
X
Xiaoyu Wang 已提交
75
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
76
    if (pOperator->numOfDownstream == 0) {
77
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
H
Haojun Liao 已提交
78 79
      return TSDB_CODE_QRY_APP_ERROR;
    }
H
Haojun Liao 已提交
80

H
Haojun Liao 已提交
81
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
82
      qError("join not supported for stream block scan, %s" PRIx64, id);
H
Haojun Liao 已提交
83
      return TSDB_CODE_QRY_APP_ERROR;
H
Haojun Liao 已提交
84
    }
L
Liu Jicong 已提交
85
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
86
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
87
  } else {
88 89
    pOperator->status = OP_NOT_OPENED;

90
    SStreamScanInfo* pInfo = pOperator->info;
91

L
Liu Jicong 已提交
92 93
    ASSERT(pInfo->validBlockIndex == 0);
    ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0);
94

L
Liu Jicong 已提交
95
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
C
Cary Xu 已提交
96
      // ASSERT(numOfBlocks > 1);
L
Liu Jicong 已提交
97 98 99
      for (int32_t i = 0; i < numOfBlocks; i++) {
        SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*));
        taosArrayPush(pInfo->pBlockLists, &pReq);
100
      }
L
Liu Jicong 已提交
101 102 103 104 105
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      ASSERT(numOfBlocks == 1);
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
106
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
107
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
108
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
L
Liu Jicong 已提交
109
        taosArrayPush(pInfo->pBlockLists, &pDataBlock);
H
Haojun Liao 已提交
110
      }
L
Liu Jicong 已提交
111
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
112 113
    } else {
      ASSERT(0);
114 115
    }

H
Haojun Liao 已提交
116 117 118 119
    return TSDB_CODE_SUCCESS;
  }
}

120 121
static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); }

L
Liu Jicong 已提交
122
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
123 124 125 126
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

H
Haojun Liao 已提交
127
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
128 129 130
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
131
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
132

C
Cary Xu 已提交
133
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
134
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
135
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
136
  } else {
H
Haojun Liao 已提交
137
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
138 139 140 141 142
  }

  return code;
}

L
Liu Jicong 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

H
Haojun Liao 已提交
164
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
L
Liu Jicong 已提交
165
  if (msg == NULL) {
L
Liu Jicong 已提交
166
    // create raw scan
167 168 169 170 171 172 173 174 175 176 177

    SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
    setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);

    pTaskInfo->cost.created = taosGetTimestampMs();
    pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
    pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
L
Liu Jicong 已提交
178
    if (NULL == pTaskInfo->pRoot) {
179 180 181 182 183
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
    return pTaskInfo;
L
Liu Jicong 已提交
184 185
  }

H
Haojun Liao 已提交
186 187
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
188 189 190 191 192 193
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
194
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
195
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
196 197
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
198 199 200 201
    terrno = code;
    return NULL;
  }

202
  // extract the number of output columns
H
Haojun Liao 已提交
203
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
204
  *numOfCols = 0;
205

L
Liu Jicong 已提交
206
  SNode* pNode;
207 208 209
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
210
      ++(*numOfCols);
211 212 213
    }
  }

H
Haojun Liao 已提交
214
  *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
L
Liu Jicong 已提交
215 216 217
  return pTaskInfo;
}

L
Liu Jicong 已提交
218
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
L
Liu Jicong 已提交
219
  if (msg == NULL) {
220 221 222
    return NULL;
  }

223
  /*qDebugL("stream task string %s", (const char*)msg);*/
X
bugfix  
Xiaoyu Wang 已提交
224

H
Haojun Liao 已提交
225 226
  struct SSubplan* pPlan = NULL;
  int32_t          code = qStringToSubplan(msg, &pPlan);
227 228 229 230 231 232
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
H
Haojun Liao 已提交
233
  code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
234
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
235 236
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
237 238 239 240 241 242
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
243

244
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
245 246 247 248 249 250
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));

  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
  metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
  for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
251
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
252 253 254

    int32_t code = metaGetTableEntryByUid(&mr, *id);
    if (code != TSDB_CODE_SUCCESS) {
255
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
256 257 258
      continue;
    }

M
Minglei Jin 已提交
259 260
    tDecoderClear(&mr.coder);

261
    // TODO handle ntb case
L
Liu Jicong 已提交
262
    if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
263 264
      continue;
    }
265 266 267

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
268
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
H
Haojun Liao 已提交
269
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
270 271 272 273 274 275 276 277 278 279
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

280
    // handle multiple partition
281 282 283 284 285 286 287
    taosArrayPush(qa, id);
  }

  metaReaderClear(&mr);
  return qa;
}

288
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
L
Liu Jicong 已提交
289
  SExecTaskInfo*  pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
290 291

  if (isAdd) {
L
Liu Jicong 已提交
292
    qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
H
Haojun Liao 已提交
293 294
  }

295
  // traverse to the stream scanner node to add this table id
296
  SOperatorInfo* pInfo = pTaskInfo->pRoot;
L
Liu Jicong 已提交
297
  while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
298 299 300
    pInfo = pInfo->pDownstream[0];
  }

301 302
  int32_t          code = 0;
  SStreamScanInfo* pScanInfo = pInfo->info;
303
  if (isAdd) {  // add new table id
304
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
305 306 307
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);

    qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables);
308

L
Liu Jicong 已提交
309
    code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
310
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
311
      taosArrayDestroy(qa);
312 313 314 315
      return code;
    }

    // todo refactor STableList
M
Minglei Jin 已提交
316
    bool   assignUid = false;
L
Liu Jicong 已提交
317 318
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
319
    if (bufLen > 0) {
320
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
321 322
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
323
        taosArrayDestroy(qa);
324 325 326
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
327

H
Haojun Liao 已提交
328
    STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
329 330

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
331
      uint64_t*     uid = taosArrayGet(qa, i);
332
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
333 334

      if (bufLen > 0) {
335 336 337 338 339 340
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
                                       &keyInfo.groupId);
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
341
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
342
            taosArrayDestroy(qa);
343 344
            return code;
          }
345 346 347
        }
      }

L
Liu Jicong 已提交
348
#if 0
349
      bool exists = false;
H
Haojun Liao 已提交
350 351 352 353 354 355
      for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
        STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
        if (pKeyInfo->uid == keyInfo.uid) {
          qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
          exists = true;
        }
356 357
      }

H
Haojun Liao 已提交
358
      if (!exists) {
359
#endif
360

H
Haojun Liao 已提交
361
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
362 363
    }

364 365 366 367
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

368 369 370
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
    qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
L
Liu Jicong 已提交
371
    code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
372 373
  }

374
  return code;
L
fix  
Liu Jicong 已提交
375
}
376

377
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
378
                                    int32_t* tversion) {
379
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
380
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
381

382
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
383 384 385
    return TSDB_CODE_SUCCESS;
  }

386 387 388 389
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
390 391 392
  } else {
    dbName[0] = 0;
  }
393 394
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
395 396 397
  } else {
    tableName[0] = 0;
  }
398 399

  return 0;
400
}
401 402

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
403
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
404 405 406 407 408 409
  assert(pSubplan != NULL);
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;

  taosThreadOnce(&initPoolOnce, initRefPool);
  atexit(cleanupRefPool);

410
  qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
411

412 413
  int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
  if (code != TSDB_CODE_SUCCESS) {
414
    qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code));
415 416 417
    goto _error;
  }

418
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
419 420
  code = dsDataSinkMgtInit(&cfg);
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
421
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
422 423 424 425 426 427 428
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
429
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
430 431 432
      goto _error;
    }

H
Haojun Liao 已提交
433
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
L
Liu Jicong 已提交
434
    if (code != TSDB_CODE_SUCCESS) {
wmmhello's avatar
wmmhello 已提交
435 436
      taosMemoryFreeClear(pSinkParam);
    }
437 438
  }

439
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
440

441
  _error:
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo) {
  if (pQInfo->sql) {
    int   ms = 0;
    char* pcnt = strstr(pQInfo->sql, " count(*)");
    if (pcnt) return 0;

    char* pos = strstr(pQInfo->sql, " t_");
    if (pos) {
      pos += 3;
      ms = atoi(pos);
      while (*pos >= '0' && *pos <= '9') {
        pos++;
      }
      char unit_char = *pos;
      if (unit_char == 'h') {
        ms *= 3600 * 1000;
      } else if (unit_char == 'm') {
        ms *= 60 * 1000;
      } else if (unit_char == 's') {
        ms *= 1000;
      }
    }
    if (ms == 0) return 0;
    printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);

    if (ms < 1000) {
      taosMsleep(ms);
    } else {
      int used_ms = 0;
      while (used_ms < ms) {
        taosMsleep(1000);
        used_ms += 1000;
        if (isTaskKilled(pQInfo)) {
          printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
          break;
        }
      }
    }
  }
  return 1;
}
#endif

H
Haojun Liao 已提交
491
static void freeBlock(void* param) {
492
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
493 494 495
  blockDataDestroy(pBlock);
}

496
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
497 498 499
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
500
  if (pLocal) {
501
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
502
  }
L
Liu Jicong 已提交
503

H
Haojun Liao 已提交
504 505
  taosArrayClearEx(pResList, freeBlock);

506 507 508 509 510 511 512 513 514 515 516 517
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
518
    atomic_store_64(&pTaskInfo->owner, 0);
519 520 521 522 523 524 525 526 527
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
528

529 530 531 532 533 534 535 536
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

537
  int32_t      current = 0;
H
Haojun Liao 已提交
538 539
  SSDataBlock* pRes = NULL;

540 541
  int64_t st = taosGetTimestampUs();

542
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
543 544 545 546 547 548 549 550 551 552
    SSDataBlock* p = createOneDataBlock(pRes, true);
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

553
  *hasMore = (pRes != NULL);
554 555 556
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
557
  if (NULL == pRes) {
558 559 560 561 562
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
563
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
564
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
565
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
    pTaskInfo->cost.start = taosGetTimestampMs();
  }

  if (isTaskKilled(pTaskInfo)) {
588
    atomic_store_64(&pTaskInfo->owner, 0);
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;

  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo);
  return TSDB_CODE_SUCCESS;
}

void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

  qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);

  queryCostStatis(pTaskInfo);  // print the query cost summary
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
651
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
652
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
653
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
654 655 656 657 658 659 660 661 662 663
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
H
Haojun Liao 已提交
664
  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
    taosMemoryFreeClear(*pOutput);
    *len = 0;
  }
  return code;
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

  return decodeOperator(pTaskInfo->pRoot, pInput, len);
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
686
    uint16_t type = pOperator->operatorType;
687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

#if 0
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
  return 0;
}
#endif

706
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) {
707 708
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
709 710 711 712 713 714 715 716 717
  pTaskInfo->streamInfo.recoverStartVer = 0;
  pTaskInfo->streamInfo.recoverEndVer = ver;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1;
  return 0;
}

int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
L
Liu Jicong 已提交
718
  pTaskInfo->streamInfo.recoverStartVer = pTaskInfo->streamInfo.recoverEndVer;
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
  pTaskInfo->streamInfo.recoverEndVer = ver;
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
      pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
      pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
      pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
      pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
      pTaskInfo->streamInfo.triggerSaved = pInfo->twAggSup.calTrigger;
      pTaskInfo->streamInfo.deleteMarkSaved = pInfo->twAggSup.deleteMark;
      pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
      pInfo->twAggSup.deleteMark = INT64_MAX;
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
    if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;

      pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
      pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;

      pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
      pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
    } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;

      pInfo->twAggSup.calTrigger = pTaskInfo->streamInfo.triggerSaved;
      pInfo->twAggSup.deleteMark = pTaskInfo->streamInfo.deleteMarkSaved;
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
814 815 816 817 818 819 820 821
  return 0;
}

void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
822 823 824 825 826 827 828 829
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
830 831
}

wmmhello's avatar
wmmhello 已提交
832
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
833 834
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
wmmhello's avatar
wmmhello 已提交
835
  return &pTaskInfo->streamInfo.metaRsp;
836 837
}

wmmhello's avatar
wmmhello 已提交
838 839 840 841 842 843
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  return pTaskInfo->streamInfo.prepareStatus.uid;
}

844 845 846 847 848 849 850
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
  return 0;
}

H
Haojun Liao 已提交
851
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
852 853
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
854
  pCond->numOfCols = pMtInfo->schema->nCols;
855 856 857 858 859 860 861
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
  if (pCond->colList == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
  }

  pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Haojun Liao 已提交
862
  pCond->suid = pMtInfo->suid;
863 864 865 866 867
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
868 869 870
    pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
    pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
    pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
871 872 873 874 875
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
876 877 878 879 880 881 882 883
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  ASSERT(pTaskInfo->streamInfo.pReq == NULL);
  pTaskInfo->streamInfo.pReq = pReq;
  return 0;
}

884
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
885 886 887 888
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
  pTaskInfo->streamInfo.prepareStatus = *pOffset;
889
  pTaskInfo->streamInfo.returned = 0;
890 891 892 893
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
    return 0;
  }
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
wmmhello's avatar
wmmhello 已提交
894
    uint16_t type = pOperator->operatorType;
895 896 897 898 899 900
    pOperator->status = OP_OPENED;
    // TODO add more check
    if (type != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
L
Liu Jicong 已提交
901

902 903 904 905 906
    SStreamScanInfo* pInfo = pOperator->info;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
      tsdbReaderClose(pTSInfo->dataReader);
      pTSInfo->dataReader = NULL;
907
#if 0
wmmhello's avatar
wmmhello 已提交
908 909 910 911 912 913
      if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
          pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
        qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
               pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
        ASSERT(0);
      }
914
#endif
915 916 917 918 919 920 921 922 923 924
      if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
        return -1;
      }
      ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      /*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;

      if (uid == 0) {
H
Haojun Liao 已提交
925 926
        if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
927 928 929
          uid = pTableInfo->uid;
          ts = INT64_MIN;
        } else {
L
Liu Jicong 已提交
930 931
          return -1;
        }
932
      }
H
Haojun Liao 已提交
933

934 935 936
      /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
      /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
937
      int32_t         numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
938 939

#ifndef NDEBUG
wmmhello's avatar
wmmhello 已提交
940 941
      qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
             pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
942
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
943 944
#endif

945
      bool found = false;
946
      for (int32_t i = 0; i < numOfTables; i++) {
H
Haojun Liao 已提交
947
        STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
948 949 950 951
        if (pTableInfo->uid == uid) {
          found = true;
          pTableScanInfo->currentTable = i;
          break;
L
Liu Jicong 已提交
952
        }
953
      }
954

L
Liu Jicong 已提交
955
      // TODO after dropping table, table may not found
956
      ASSERT(found);
957

958
      if (pTableScanInfo->dataReader == NULL) {
H
Haojun Liao 已提交
959 960
        STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
        int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
961 962 963

        if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
                           &pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
964
          ASSERT(0);
L
Liu Jicong 已提交
965
        }
966
      }
H
Haojun Liao 已提交
967

968 969
      STableKeyInfo tki = {.uid = uid};
      tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1);
970 971 972 973 974
      int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
      pTableScanInfo->cond.twindows.skey = ts + 1;
      tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
      pTableScanInfo->cond.twindows.skey = oldSkey;
      pTableScanInfo->scanTimes = 0;
975

wmmhello's avatar
wmmhello 已提交
976
      qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
977
             ts, pTableScanInfo->currentTable, numOfTables);
978 979 980
      /*}*/
    } else {
      ASSERT(0);
981
    }
L
Liu Jicong 已提交
982
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
983
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
984 985 986
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
      qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
987 988 989 990 991 992
      return -1;
    }

    SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
H
Haojun Liao 已提交
993

994
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
H
Haojun Liao 已提交
995 996 997 998 999
    tableListClear(pTaskInfo->pTableInfoList);

    if (mtInfo.uid == 0) {
      return 0;  // no data
    }
1000

H
Haojun Liao 已提交
1001
    initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
1002
    pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
1003

H
Haojun Liao 已提交
1004 1005 1006 1007 1008
    if (pTaskInfo->pTableInfoList == NULL)  {
      pTaskInfo->pTableInfoList = tableListCreate();
    }

    tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
1009

H
Haojun Liao 已提交
1010 1011
    STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
    int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
1012
    ASSERT(size == 1);
1013

H
Haojun Liao 已提交
1014
    tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
1015

1016
    cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
L
Liu Jicong 已提交
1017 1018 1019 1020
    strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
    tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
    pTaskInfo->streamInfo.schema = mtInfo.schema;

1021
    qDebug("tmqsnap qStreamPrepareScan snapshot data uid %" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
L
Liu Jicong 已提交
1022
  } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
wmmhello's avatar
wmmhello 已提交
1023
    SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1024 1025
    SSnapContext*       sContext = pInfo->sContext;
    if (setForSnapShot(sContext, pOffset->uid) != 0) {
H
Haojun Liao 已提交
1026
      qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
wmmhello's avatar
wmmhello 已提交
1027 1028
      return -1;
    }
1029
    qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
L
Liu Jicong 已提交
1030
  } else if (pOffset->type == TMQ_OFFSET__LOG) {
wmmhello's avatar
wmmhello 已提交
1031 1032 1033
    SStreamRawScanInfo* pInfo = pOperator->info;
    tsdbReaderClose(pInfo->dataReader);
    pInfo->dataReader = NULL;
wmmhello's avatar
wmmhello 已提交
1034
    qDebug("tmqsnap qStreamPrepareScan snapshot log");
1035 1036 1037
  }
  return 0;
}