executor.c 48.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
17 18
#include <libs/transport/trpc.h>
#include <libs/wal/wal.h>
19 20
#include "executorInt.h"
#include "operator.h"
21
#include "planner.h"
22
#include "querytask.h"
L
Liu Jicong 已提交
23
#include "tdatablock.h"
L
Liu Jicong 已提交
24
#include "tref.h"
25
#include "tudf.h"
26 27

#include "storageapi.h"
28 29 30 31 32 33 34 35

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
36

X
Xiaoyu Wang 已提交
37 38
static void initRefPool() {
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
D
dapan1121 已提交
39 40 41
  atexit(cleanupRefPool);
}

L
Liu Jicong 已提交
42 43 44 45
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
46
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
47 48 49 50
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
51
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59 60 61
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
K
kailixu 已提交
62 63
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
        taosArrayPush(pInfo->pBlockLists, pReq);
L
Liu Jicong 已提交
64 65 66 67 68 69 70 71
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
K
kailixu 已提交
72 73 74 75
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
L
Liu Jicong 已提交
76 77 78 79 80 81 82 83
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
84
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
H
Haojun Liao 已提交
85 86 87 88 89
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
    }
L
Liu Jicong 已提交
90

H
Haojun Liao 已提交
91 92 93
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
94
    }
95

H
Haojun Liao 已提交
96 97
    pOperator->status = OP_NOT_OPENED;
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
L
Liu Jicong 已提交
98 99 100 101
  }
  return 0;
}

102 103 104 105 106 107 108 109 110 111 112
static void clearStreamBlock(SOperatorInfo* pOperator) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 1) {
      return clearStreamBlock(pOperator->pDownstream[0]);
    }
  } else {
    SStreamScanInfo* pInfo = pOperator->info;
    doClearBufferedBlocks(pInfo);
  }
}

5
54liuyao 已提交
113 114 115 116 117 118
void resetTaskInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  pTaskInfo->code = 0;
  clearStreamBlock(pTaskInfo->pRoot);
}

119 120 121 122 123 124
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
  if (pTaskInfo == NULL) {
    return;
  }

125
  qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX);
126
  pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
127
  pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX;
128 129
}

130
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
X
Xiaoyu Wang 已提交
131
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
132
    if (pOperator->numOfDownstream == 0) {
133
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
134
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
135
    }
H
Haojun Liao 已提交
136

H
Haojun Liao 已提交
137
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
138
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
139
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
140
    }
L
Liu Jicong 已提交
141
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
142
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
143
  } else {
144
    pOperator->status = OP_NOT_OPENED;
145
    SStreamScanInfo* pInfo = pOperator->info;
146

147
    qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
148
    ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
149

L
Liu Jicong 已提交
150 151
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
152
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
L
Liu Jicong 已提交
153
        taosArrayPush(pInfo->pBlockLists, pReq);
154
      }
155

L
Liu Jicong 已提交
156 157
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
158
      taosArrayPush(pInfo->pBlockLists, input);
L
Liu Jicong 已提交
159
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
160
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
161
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
162
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
163
        SPackedData  tmp = { .pDataBlock = pDataBlock };
L
Liu Jicong 已提交
164
        taosArrayPush(pInfo->pBlockLists, &tmp);
H
Haojun Liao 已提交
165
      }
166

L
Liu Jicong 已提交
167
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
168 169
    } else {
      ASSERT(0);
170 171
    }

H
Haojun Liao 已提交
172 173 174 175
    return TSDB_CODE_SUCCESS;
  }
}

176
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) {
177 178 179
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
H
Haojun Liao 已提交
180 181 182
    if (pStreamScanInfo->pTableScanOp != NULL) {
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
      if (pScanInfo->base.dataReader != NULL) {
183
        pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
H
Haojun Liao 已提交
184
      }
185 186
    }
  } else {
187
    doSetTaskId(pOperator->pDownstream[0], pAPI);
188 189 190
  }
}

H
Haojun Liao 已提交
191 192 193
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
  SExecTaskInfo* pTaskInfo = tinfo;
  pTaskInfo->id.queryId = queryId;
H
Haojun Liao 已提交
194
  buildTaskId(taskId, queryId, pTaskInfo->id.str);
195 196

  // set the idstr for tsdbReader
197
  doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
H
Haojun Liao 已提交
198 199
}

wmmhello's avatar
wmmhello 已提交
200 201 202 203
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
//  SExecTaskInfo* pTaskInfo = tinfo;
//  pTaskInfo->code = code;
//}
204

L
Liu Jicong 已提交
205 206
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
207
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}
221

L
liuyao 已提交
222
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
223
  SExecTaskInfo* pTaskInfo = tinfo;
L
liuyao 已提交
224 225
  *dataVer = pTaskInfo->streamInfo.dataVersion;
  *ckId = pTaskInfo->streamInfo.checkPointId;
226 227 228
}


L
Liu Jicong 已提交
229
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
230
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
231
    return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
232 233
  }

H
Haojun Liao 已提交
234
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
235 236 237
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
238
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
239

C
Cary Xu 已提交
240
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
241
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
242
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
243
  } else {
H
Haojun Liao 已提交
244
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
245 246 247 248 249
  }

  return code;
}

L
Liu Jicong 已提交
250 251
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
252
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

X
Xiaoyu Wang 已提交
271 272 273
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
                                     uint64_t id) {
  if (msg == NULL) {  // create raw scan
274
    SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api);
275 276 277 278
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
279

280
    pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
L
Liu Jicong 已提交
281
    if (NULL == pTaskInfo->pRoot) {
282 283 284 285
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
286

287
    pTaskInfo->storageAPI = pReaderHandle->api;
288
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
289
    return pTaskInfo;
L
Liu Jicong 已提交
290 291
  }

H
Haojun Liao 已提交
292 293
  SSubplan* pPlan = NULL;
  int32_t   code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
294 295 296 297 298 299
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
300
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
301
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
302 303
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
304 305 306 307
    terrno = code;
    return NULL;
  }

308
  // extract the number of output columns
H
Haojun Liao 已提交
309
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
310
  *numOfCols = 0;
311

L
Liu Jicong 已提交
312
  SNode* pNode;
313 314 315
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
316
      ++(*numOfCols);
317 318 319
    }
  }

L
Liu Jicong 已提交
320 321 322
  return pTaskInfo;
}

323
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) {
L
Liu Jicong 已提交
324
  if (msg == NULL) {
325 326 327
    return NULL;
  }

328 329
  SSubplan* pPlan = NULL;
  int32_t   code = qStringToSubplan(msg, &pPlan);
330 331 332 333 334 335
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
336
  code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
337
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
338
    nodesDestroyNode((SNode*)pPlan);
H
Haojun Liao 已提交
339
    qDestroyTask(pTaskInfo);
340 341 342 343 344 345
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
346

347 348
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
    SStorageAPI* pAPI) {
349
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
H
Haojun Liao 已提交
350 351 352 353
  int32_t numOfUids = taosArrayGetSize(tableIdList);
  if (numOfUids == 0) {
    return qa;
  }
354

355 356
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;

357
  uint64_t suid = 0;
358
  uint64_t uid = 0;
359
  int32_t type = 0;
360
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
361

362 363
  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
H
Haojun Liao 已提交
364
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0, &pAPI->metaFn);
H
Haojun Liao 已提交
365
  for (int32_t i = 0; i < numOfUids; ++i) {
366
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
367

368
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
369
    if (code != TSDB_CODE_SUCCESS) {
370
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
371 372 373
      continue;
    }

M
Minglei Jin 已提交
374 375
    tDecoderClear(&mr.coder);

376
    if (mr.me.type == TSDB_SUPER_TABLE) {
377
      continue;
378 379 380
    } else {
      if (type == TSDB_SUPER_TABLE) {
        // this new created child table does not belong to the scanned super table.
381
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
382 383 384 385 386
          continue;
        }
      } else {  // ordinary table
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
387
        if (mr.me.uid != uid) {
388 389 390
          continue;
        }
      }
391
    }
392 393 394

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
395
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
396
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
397 398 399 400 401 402 403 404 405 406
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

407
    // handle multiple partition
408 409 410
    taosArrayPush(qa, id);
  }

411
  pAPI->metaReaderFn.clearReader(&mr);
412 413 414
  return qa;
}

415
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
dengyihao's avatar
dengyihao 已提交
416
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
417 418
  const char*    id = GET_TASKID(pTaskInfo);
  int32_t        code = 0;
H
Haojun Liao 已提交
419 420

  if (isAdd) {
421
    qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
H
Haojun Liao 已提交
422 423
  }

424
  // traverse to the stream scanner node to add this table id
425
  SOperatorInfo*   pInfo = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
426
  SStreamScanInfo* pScanInfo = pInfo->info;
427

428
  if (isAdd) {  // add new table id
429
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI);
430
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);
431
    qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
432
    code = pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
433
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
434
      taosArrayDestroy(qa);
435 436 437
      return code;
    }

M
Minglei Jin 已提交
438
    bool   assignUid = false;
L
Liu Jicong 已提交
439 440
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
441
    if (bufLen > 0) {
442
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
443 444
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
445
        taosArrayDestroy(qa);
446 447 448
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
449

450
    STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
451
    taosWLockLatch(&pTaskInfo->lock);
452 453

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
454
      uint64_t*     uid = taosArrayGet(qa, i);
455
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
456 457

      if (bufLen > 0) {
458 459 460
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
461
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
462
                                       &keyInfo.groupId, &pTaskInfo->storageAPI);
463
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
464
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
465
            taosArrayDestroy(qa);
466
            taosWUnLockLatch(&pTaskInfo->lock);
467 468
            return code;
          }
469 470 471
        }
      }

H
Haojun Liao 已提交
472
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
473 474
    }

475
    taosWUnLockLatch(&pTaskInfo->lock);
476 477 478 479
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

480 481
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
482
    qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
483
    taosWLockLatch(&pTaskInfo->lock);
484
    code = pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
485
    taosWUnLockLatch(&pTaskInfo->lock);
486 487
  }

488
  return code;
L
fix  
Liu Jicong 已提交
489
}
490

491
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
492
                                    int32_t* tversion) {
493
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
494
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
495

496
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
497 498 499
    return TSDB_CODE_SUCCESS;
  }

500 501 502 503
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
504 505 506
  } else {
    dbName[0] = 0;
  }
507 508
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
509 510 511
  } else {
    tableName[0] = 0;
  }
512 513

  return 0;
514
}
515 516

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
517
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
518 519 520
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
  taosThreadOnce(&initPoolOnce, initRefPool);

521
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
522

523
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
524
  if (code != TSDB_CODE_SUCCESS) {
525
    qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
526 527 528
    goto _error;
  }

529
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
H
Haojun Liao 已提交
530
  code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
531
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
532
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
533 534 535 536 537
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
538
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
539
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
540
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
541 542 543
      goto _error;
    }

H
Haojun Liao 已提交
544
    // pSinkParam has been freed during create sinker.
H
Haojun Liao 已提交
545
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
546 547
  }

548
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
549

dengyihao's avatar
dengyihao 已提交
550
_error:
551 552 553 554
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

H
Haojun Liao 已提交
555
static void freeBlock(void* param) {
556
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
557 558 559
  blockDataDestroy(pBlock);
}

560
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
561 562 563
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
564
  if (pLocal) {
565
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
566
  }
L
Liu Jicong 已提交
567

H
Haojun Liao 已提交
568
  taosArrayClear(pResList);
H
Haojun Liao 已提交
569

570 571 572 573 574 575 576 577
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
578
    pTaskInfo->cost.start = taosGetTimestampUs();
579 580 581
  }

  if (isTaskKilled(pTaskInfo)) {
582
    atomic_store_64(&pTaskInfo->owner, 0);
583 584 585 586 587 588 589 590 591
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
592

593 594 595 596 597 598 599 600
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

601
  int32_t      current = 0;
H
Haojun Liao 已提交
602 603
  SSDataBlock* pRes = NULL;

604 605
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
606
  int32_t blockIndex = 0;
607
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
608 609 610 611 612 613
    SSDataBlock* p = NULL;
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
      SSDataBlock* p1 = createOneDataBlock(pRes, true);
      taosArrayPush(pTaskInfo->pResultBlockList, &p1);
      p = p1;
    } else {
L
Liu Jicong 已提交
614
      p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
H
Haojun Liao 已提交
615 616 617 618 619
      copyDataBlock(p, pRes);
    }

    blockIndex += 1;

H
Haojun Liao 已提交
620 621 622 623 624 625 626 627 628
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

629
  *hasMore = (pRes != NULL);
630 631 632
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
633
  if (NULL == pRes) {
634 635 636 637 638
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
639
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
640
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
641
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
642 643 644 645 646

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

H
Haojun Liao 已提交
647 648
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
649 650 651
  SArray*        pList = pTaskInfo->pResultBlockList;
  size_t         num = taosArrayGetSize(pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
652 653 654 655 656 657 658
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
    blockDataDestroy(*p);
  }

  taosArrayClear(pTaskInfo->pResultBlockList);
}

659 660 661 662 663 664 665 666 667 668 669 670 671
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
672
    pTaskInfo->cost.start = taosGetTimestampUs();
673 674
  }

5
54liuyao 已提交
675
  if (isTaskKilled(pTaskInfo)) {
676
    clearStreamBlock(pTaskInfo->pRoot);
677
    atomic_store_64(&pTaskInfo->owner, 0);
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

L
Liu Jicong 已提交
716
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

  return TSDB_CODE_SUCCESS;
}

int32_t stopInfoComp(void const* lp, void const* rp) {
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;

  if (key->refId < pInfo->refId) {
    return -1;
  } else if (key->refId > pInfo->refId) {
    return 1;
  }

  return 0;
}

L
Liu Jicong 已提交
737
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
738 739 740 741 742 743 744 745 746 747 748 749 750
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
  if (idx >= 0) {
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
  }
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
  taosWLockLatch(&pTaskInfo->stopInfo.lock);

  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
  for (int32_t i = 0; i < num; ++i) {
L
Liu Jicong 已提交
751 752
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
    SExchangeInfo*       pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
D
dapan1121 已提交
753 754 755 756 757 758 759 760 761
    if (pExchangeInfo) {
      tsem_post(&pExchangeInfo->ready);
      taosReleaseRef(exchangeObjRefPool, pStop->refId);
    }
  }

  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

D
dapan1121 已提交
762
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
763 764 765 766 767 768
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
769

D
dapan1121 已提交
770
  setTaskKilled(pTaskInfo, rspCode);
D
dapan1121 已提交
771
  qStopTaskOperators(pTaskInfo);
L
Liu Jicong 已提交
772

773 774 775
  return TSDB_CODE_SUCCESS;
}

776 777 778 779 780 781
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
782 783
  qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
784

X
Xiaoyu Wang 已提交
785
  while (qTaskIsExecuting(pTaskInfo)) {
786 787 788 789 790 791 792
    taosMsleep(10);
  }

  pTaskInfo->code = rspCode;
  return TSDB_CODE_SUCCESS;
}

793 794 795 796 797 798 799 800 801
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (NULL == pTaskInfo) {
    return false;
  }

  return 0 != atomic_load_64(&pTaskInfo->owner);
}

H
Haojun Liao 已提交
802 803
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
804
  int64_t        idleTime = pSummary->start - pSummary->created;
H
Haojun Liao 已提交
805 806 807 808

  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
    qDebug(
809
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
810
        "createGroupIdMap:%.2f ms, total blocks:%d, "
H
Haojun Liao 已提交
811
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
812 813 814 815 816 817
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
        pRecorder->totalRows, pRecorder->totalCheckedRows);
  } else {
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
           pSummary->elapsedTime / 1000.0);
H
Haojun Liao 已提交
818 819 820
  }
}

821 822 823 824 825 826
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

H
Haojun Liao 已提交
827 828 829 830 831
  if (pTaskInfo->pRoot != NULL) {
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
  } else {
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
  }
832

H
Haojun Liao 已提交
833
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
834 835 836
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
837
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
838
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
839
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
840 841 842 843 844 845 846 847 848
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
L
Liu Jicong 已提交
849 850 851 852 853
  //  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  //  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
  //    taosMemoryFreeClear(*pOutput);
  //    *len = 0;
  //  }
H
Haojun Liao 已提交
854
  return 0;
855 856 857 858 859 860 861 862 863
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
864
  return 0;
L
Liu Jicong 已提交
865
  //  return decodeOperator(pTaskInfo->pRoot, pInput, len);
866 867 868 869 870 871 872
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
873
    uint16_t type = pOperator->operatorType;
874 875 876 877 878 879 880 881 882 883
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

L
liuyao 已提交
884
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
885 886
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
887 888 889 890 891 892

  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;

  pStreamInfo->fillHistoryVer = *pVerRange;
  pStreamInfo->fillHistoryWindow = *pWindow;
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
L
liuyao 已提交
893 894
  pStreamInfo->recoverStep1Finished = false;
  pStreamInfo->recoverStep2Finished = false;
895

896
  qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
L
liuyao 已提交
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
         " - %" PRId64,
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
         pWindow->ekey);
  return 0;
}

int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);

  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;

  pStreamInfo->fillHistoryVer = *pVerRange;
  pStreamInfo->fillHistoryWindow = *pWindow;
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  pStreamInfo->recoverStep1Finished = true;
  pStreamInfo->recoverStep2Finished = false;

915
  qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
916
         ", window:%" PRId64 " - %" PRId64,
917 918
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
         pWindow->ekey);
919 920 921 922 923 924 925 926 927 928
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

929
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
930 931 932 933
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
934 935 936
    int32_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
937
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
938
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;
L
Liu Jicong 已提交
939

940 941
      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
942

943 944 945 946 947 948
      qInfo("save stream param for interval: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);

      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
949 950
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
951 952 953
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
954
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
955 956 957 958
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;

      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
959

960
      qInfo("save stream param for session: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
L
Liu Jicong 已提交
961

962 963 964 965
      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
966 967
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
968
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
969
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
970 971 972 973
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;

      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
974

975
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
L
Liu Jicong 已提交
976

977 978 979 980
      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
981 982
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

1001
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
1002
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1003
  const char*    id = GET_TASKID(pTaskInfo);
1004 1005 1006
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
1007 1008 1009
    uint16_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
1010
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1011 1012
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1013
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1014 1015 1016 1017 1018
      qInfo("%s restore stream agg executors param for interval: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
1019
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1020 1021
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1022
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1023 1024 1025
      qInfo("%s restore stream agg executor param for session: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
1026
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1027 1028
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1029
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1030 1031
      qInfo("%s restore stream agg executor param for state: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
1045
}
1046

L
Liu Jicong 已提交
1047 1048 1049 1050
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverScanFinished;
}
1051

L
liuyao 已提交
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverStep1Finished;
}

bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverStep2Finished;
}

int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  pTaskInfo->streamInfo.recoverStep1Finished = true;
  pTaskInfo->streamInfo.recoverStep2Finished = true;
1066 1067 1068

  // reset the time window
  pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
L
liuyao 已提交
1069 1070 1071
  return 0;
}

1072 1073 1074 1075 1076
void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
1077 1078 1079 1080 1081 1082 1083 1084
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
1085 1086
}

wmmhello's avatar
wmmhello 已提交
1087
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
1088
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
wmmhello's avatar
wmmhello 已提交
1089
  return &pTaskInfo->streamInfo.metaRsp;
1090 1091
}

1092
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
wmmhello's avatar
wmmhello 已提交
1093
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1094
  memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
1095 1096
}

H
Haojun Liao 已提交
1097
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
1098 1099
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1100
  pCond->numOfCols = pMtInfo->schema->nCols;
1101
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
H
Haojun Liao 已提交
1102 1103 1104 1105
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
    taosMemoryFreeClear(pCond->colList);
    taosMemoryFreeClear(pCond->pSlotList);
S
Shengliang Guan 已提交
1106
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1107 1108 1109
    return terrno;
  }

H
Haojun Liao 已提交
1110
  pCond->twindows = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
1111
  pCond->suid = pMtInfo->suid;
1112 1113 1114 1115 1116
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
1117 1118 1119 1120 1121 1122
    SColumnInfo* pColInfo = &pCond->colList[i];
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;

    pCond->pSlotList[i] = i;
1123 1124 1125 1126 1127
  }

  return TSDB_CODE_SUCCESS;
}

1128 1129
void qStreamSetOpen(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
X
Xiaoyu Wang 已提交
1130
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
1131 1132 1133
  pOperator->status = OP_NOT_OPENED;
}

1134
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
1135
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1136 1137
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

1138
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
X
Xiaoyu Wang 已提交
1139
  const char*    id = GET_TASKID(pTaskInfo);
1140

1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
  if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
    pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
    if (pOperator == NULL) {
      return -1;
    }
    SStreamScanInfo* pInfo = pOperator->info;
    SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
    SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
    walReaderVerifyOffset(pWalReader, pOffset);
  }
wmmhello's avatar
wmmhello 已提交
1151
  // if pOffset equal to current offset, means continue consume
1152
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
1153 1154
    return 0;
  }
1155

1156
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
1157
    pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
wmmhello's avatar
wmmhello 已提交
1158 1159
    if (pOperator == NULL) {
      return -1;
1160
    }
1161

1162
    SStreamScanInfo* pInfo = pOperator->info;
1163 1164
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
1165
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
1166

1167
    if (pOffset->type == TMQ_OFFSET__LOG) {
1168
      // todo refactor: move away
1169
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
1170 1171
      pScanBaseInfo->dataReader = NULL;

H
Haojun Liao 已提交
1172 1173 1174 1175 1176 1177 1178
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
      SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
      walReaderVerifyOffset(pWalReader, pOffset);
      if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
        return -1;
      }
1179
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1180 1181
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1182 1183
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;
1184
      int32_t index = 0;
1185

1186 1187 1188 1189
      // this value may be changed if new tables are created
      taosRLockLatch(&pTaskInfo->lock);
      int32_t numOfTables = tableListGetSize(pTableListInfo);

1190
      if (uid == 0) {
1191 1192
        if (numOfTables != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
1193 1194
          uid = pTableInfo->uid;
          ts = INT64_MIN;
1195
          pScanInfo->currentTable = 0;
1196
        } else {
1197 1198
          taosRUnLockLatch(&pTaskInfo->lock);
          qError("no table in table list, %s", id);
1199
          terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
L
Liu Jicong 已提交
1200 1201
          return -1;
        }
1202
      }
H
Haojun Liao 已提交
1203

X
Xiaoyu Wang 已提交
1204 1205
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
             pInfo->pTableScanOp->resultInfo.totalRows);
1206
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
1207

1208
      // start from current accessed position
H
Haojun Liao 已提交
1209 1210 1211
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
      // position, let's find it from the beginning.
      index = tableListFind(pTableListInfo, uid, 0);
1212
      taosRUnLockLatch(&pTaskInfo->lock);
1213

1214 1215 1216
      if (index >= 0) {
        pScanInfo->currentTable = index;
      } else {
H
Haojun Liao 已提交
1217 1218
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
               numOfTables, pScanInfo->currentTable, id);
1219
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
wmmhello's avatar
wmmhello 已提交
1220 1221
        return -1;
      }
1222

1223
      STableKeyInfo keyInfo = {.uid = uid};
X
Xiaoyu Wang 已提交
1224
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
1225 1226 1227

      // let's start from the next ts that returned to consumer.
      pScanBaseInfo->cond.twindows.skey = ts + 1;
H
Haojun Liao 已提交
1228
      pScanInfo->scanTimes = 0;
1229

1230
      if (pScanBaseInfo->dataReader == NULL) {
1231
        int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
1232
                                      pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL);
1233 1234 1235
        if (code != TSDB_CODE_SUCCESS) {
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
          terrno = code;
wmmhello's avatar
wmmhello 已提交
1236
          return -1;
L
Liu Jicong 已提交
1237
        }
1238

H
Haojun Liao 已提交
1239 1240
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1241
      } else {
1242 1243
        pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
        pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
1244
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
H
Haojun Liao 已提交
1245
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1246
      }
1247 1248 1249

      // restore the key value
      pScanBaseInfo->cond.twindows.skey = oldSkey;
1250
    } else {
1251
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
1252
      terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1253 1254 1255
      return -1;
    }

1256
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1257

1258 1259 1260
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      SStreamRawScanInfo* pInfo = pOperator->info;
      SSnapContext*       sContext = pInfo->sContext;
1261

X
Xiaoyu Wang 已提交
1262
      SOperatorInfo*  p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
1263 1264
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;

1265
      if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
X
Xiaoyu Wang 已提交
1266
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
1267
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1268 1269
        return -1;
      }
H
Haojun Liao 已提交
1270

H
Haojun Liao 已提交
1271
      SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext);
1272
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
1273
      pInfo->dataReader = NULL;
H
Haojun Liao 已提交
1274

1275 1276
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
      tableListClear(pTableListInfo);
1277

1278
      if (mtInfo.uid == 0) {
1279
        goto end;  // no data
1280
      }
1281

1282 1283
      initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
      pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
H
Haojun Liao 已提交
1284

1285
      tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
1286

1287 1288
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
      int32_t        size = tableListGetSize(pTableListInfo);
1289

1290
      pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL,
1291
                     false, NULL);
L
Liu Jicong 已提交
1292

1293 1294
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
      strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
1295
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
1296 1297
      pTaskInfo->streamInfo.schema = mtInfo.schema;

X
Xiaoyu Wang 已提交
1298
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
1299 1300 1301
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
      SStreamRawScanInfo* pInfo = pOperator->info;
      SSnapContext*       sContext = pInfo->sContext;
1302
      if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
1303
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
1304
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1305 1306
        return -1;
      }
X
Xiaoyu Wang 已提交
1307 1308
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
             id);
1309 1310
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
      SStreamRawScanInfo* pInfo = pOperator->info;
1311
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
1312
      pInfo->dataReader = NULL;
1313
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
wmmhello's avatar
wmmhello 已提交
1314
    }
1315
  }
1316 1317

end:
wmmhello's avatar
wmmhello 已提交
1318
  pTaskInfo->streamInfo.currentOffset = *pOffset;
1319

1320 1321
  return 0;
}
H
Haojun Liao 已提交
1322 1323 1324

void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
X
Xiaoyu Wang 已提交
1325
  if (pMsg->info.ahandle == NULL) {
wmmhello's avatar
wmmhello 已提交
1326 1327 1328
    qError("pMsg->info.ahandle is NULL");
    return;
  }
H
Haojun Liao 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
L
Liu Jicong 已提交
1345
}
L
Liu Jicong 已提交
1346

1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = tinfo;
  SArray* plist = getTableListInfo(pTaskInfo);

  // only extract table in the first elements
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);

  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));

  int32_t numOfTables = tableListGetSize(pTableListInfo);
  for(int32_t i = 0; i < numOfTables; ++i) {
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
    taosArrayPush(pUidList, &pKeyInfo->uid);
  }

  taosArrayDestroy(plist);
  return pUidList;
}
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385

static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
    taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = pOperator->info;
    taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
  } else {
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
      extractTableList(pList, pOperator->pDownstream[0]);
    }
  }
}

SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
  SArray*        pArray = taosArrayInit(0, POINTER_BYTES);
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  extractTableList(pArray, pOperator);
  return pArray;
L
liuyao 已提交
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
}

int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
  pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
  return 0;
}

int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
  pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
  return 0;
}