executor.c 48.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
17 18
#include <libs/transport/trpc.h>
#include <libs/wal/wal.h>
19 20
#include "executorInt.h"
#include "operator.h"
21
#include "planner.h"
22
#include "querytask.h"
L
Liu Jicong 已提交
23
#include "tdatablock.h"
L
Liu Jicong 已提交
24
#include "tref.h"
25
#include "tudf.h"
26 27

#include "storageapi.h"
28 29 30 31 32 33 34 35

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
36

X
Xiaoyu Wang 已提交
37 38
static void initRefPool() {
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
D
dapan1121 已提交
39 40 41
  atexit(cleanupRefPool);
}

L
Liu Jicong 已提交
42 43 44 45
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
46
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
47 48 49 50
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
51
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59 60 61
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
K
kailixu 已提交
62 63
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
        taosArrayPush(pInfo->pBlockLists, pReq);
L
Liu Jicong 已提交
64 65 66 67 68 69 70 71
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
K
kailixu 已提交
72 73 74 75
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
L
Liu Jicong 已提交
76 77 78 79 80 81 82 83
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
84
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
H
Haojun Liao 已提交
85 86 87 88 89
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
    }
L
Liu Jicong 已提交
90

H
Haojun Liao 已提交
91 92 93
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
94
    }
95

H
Haojun Liao 已提交
96 97
    pOperator->status = OP_NOT_OPENED;
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
L
Liu Jicong 已提交
98 99 100 101
  }
  return 0;
}

102 103 104 105 106 107 108 109 110 111 112
static void clearStreamBlock(SOperatorInfo* pOperator) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 1) {
      return clearStreamBlock(pOperator->pDownstream[0]);
    }
  } else {
    SStreamScanInfo* pInfo = pOperator->info;
    doClearBufferedBlocks(pInfo);
  }
}

5
54liuyao 已提交
113 114 115 116 117 118
void resetTaskInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  pTaskInfo->code = 0;
  clearStreamBlock(pTaskInfo->pRoot);
}

119 120 121 122 123 124
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
  if (pTaskInfo == NULL) {
    return;
  }

125
  qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN);
126 127 128
  pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
}

129
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
X
Xiaoyu Wang 已提交
130
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
131
    if (pOperator->numOfDownstream == 0) {
132
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
133
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
134
    }
H
Haojun Liao 已提交
135

H
Haojun Liao 已提交
136
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
137
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
138
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
139
    }
L
Liu Jicong 已提交
140
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
141
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
142
  } else {
143
    pOperator->status = OP_NOT_OPENED;
144
    SStreamScanInfo* pInfo = pOperator->info;
145

146
    qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
147
    ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
148

L
Liu Jicong 已提交
149 150
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
151
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
L
Liu Jicong 已提交
152
        taosArrayPush(pInfo->pBlockLists, pReq);
153
      }
154

L
Liu Jicong 已提交
155 156
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
157
      taosArrayPush(pInfo->pBlockLists, input);
L
Liu Jicong 已提交
158
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
159
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
160
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
161
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
162
        SPackedData  tmp = { .pDataBlock = pDataBlock };
L
Liu Jicong 已提交
163
        taosArrayPush(pInfo->pBlockLists, &tmp);
H
Haojun Liao 已提交
164
      }
165

L
Liu Jicong 已提交
166
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
167 168
    } else {
      ASSERT(0);
169 170
    }

H
Haojun Liao 已提交
171 172 173 174
    return TSDB_CODE_SUCCESS;
  }
}

175
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) {
176 177 178
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
H
Haojun Liao 已提交
179 180 181
    if (pStreamScanInfo->pTableScanOp != NULL) {
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
      if (pScanInfo->base.dataReader != NULL) {
182
        pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
H
Haojun Liao 已提交
183
      }
184 185
    }
  } else {
186
    doSetTaskId(pOperator->pDownstream[0], pAPI);
187 188 189
  }
}

H
Haojun Liao 已提交
190 191 192
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
  SExecTaskInfo* pTaskInfo = tinfo;
  pTaskInfo->id.queryId = queryId;
H
Haojun Liao 已提交
193
  buildTaskId(taskId, queryId, pTaskInfo->id.str);
194 195

  // set the idstr for tsdbReader
196
  doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
H
Haojun Liao 已提交
197 198
}

wmmhello's avatar
wmmhello 已提交
199 200 201 202
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
//  SExecTaskInfo* pTaskInfo = tinfo;
//  pTaskInfo->code = code;
//}
203

L
Liu Jicong 已提交
204 205
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
206
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}
220

L
liuyao 已提交
221
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
222
  SExecTaskInfo* pTaskInfo = tinfo;
L
liuyao 已提交
223 224
  *dataVer = pTaskInfo->streamInfo.dataVersion;
  *ckId = pTaskInfo->streamInfo.checkPointId;
225 226 227
}


L
Liu Jicong 已提交
228
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
229
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
230
    return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
231 232
  }

H
Haojun Liao 已提交
233
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
234 235 236
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
237
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
238

C
Cary Xu 已提交
239
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
240
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
241
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
242
  } else {
H
Haojun Liao 已提交
243
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
244 245 246 247 248
  }

  return code;
}

L
Liu Jicong 已提交
249 250
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
251
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

X
Xiaoyu Wang 已提交
270 271 272
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
                                     uint64_t id) {
  if (msg == NULL) {  // create raw scan
273
    SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api);
274 275 276 277
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
278

279
    pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
L
Liu Jicong 已提交
280
    if (NULL == pTaskInfo->pRoot) {
281 282 283 284
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
285

286
    pTaskInfo->storageAPI = pReaderHandle->api;
287
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
288
    return pTaskInfo;
L
Liu Jicong 已提交
289 290
  }

H
Haojun Liao 已提交
291 292
  SSubplan* pPlan = NULL;
  int32_t   code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
293 294 295 296 297 298
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
299
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
300
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
301 302
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
303 304 305 306
    terrno = code;
    return NULL;
  }

307
  // extract the number of output columns
H
Haojun Liao 已提交
308
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
309
  *numOfCols = 0;
310

L
Liu Jicong 已提交
311
  SNode* pNode;
312 313 314
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
315
      ++(*numOfCols);
316 317 318
    }
  }

L
Liu Jicong 已提交
319 320 321
  return pTaskInfo;
}

322
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) {
L
Liu Jicong 已提交
323
  if (msg == NULL) {
324 325 326
    return NULL;
  }

327 328
  SSubplan* pPlan = NULL;
  int32_t   code = qStringToSubplan(msg, &pPlan);
329 330 331 332 333 334
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
335
  code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
336
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
337
    nodesDestroyNode((SNode*)pPlan);
H
Haojun Liao 已提交
338
    qDestroyTask(pTaskInfo);
339 340 341 342 343 344
    terrno = code;
    return NULL;
  }

  return pTaskInfo;
}
345

346 347
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
    SStorageAPI* pAPI) {
348
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
H
Haojun Liao 已提交
349 350 351 352
  int32_t numOfUids = taosArrayGetSize(tableIdList);
  if (numOfUids == 0) {
    return qa;
  }
353

354 355
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;

356
  uint64_t suid = 0;
357
  uint64_t uid = 0;
358
  int32_t type = 0;
359
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
360

361 362
  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
H
Haojun Liao 已提交
363
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0, &pAPI->metaFn);
H
Haojun Liao 已提交
364
  for (int32_t i = 0; i < numOfUids; ++i) {
365
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
366

367
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
368
    if (code != TSDB_CODE_SUCCESS) {
369
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
370 371 372
      continue;
    }

M
Minglei Jin 已提交
373 374
    tDecoderClear(&mr.coder);

375
    if (mr.me.type == TSDB_SUPER_TABLE) {
376
      continue;
377 378 379
    } else {
      if (type == TSDB_SUPER_TABLE) {
        // this new created child table does not belong to the scanned super table.
380
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
381 382 383 384 385
          continue;
        }
      } else {  // ordinary table
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
386
        if (mr.me.uid != uid) {
387 388 389
          continue;
        }
      }
390
    }
391 392 393

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
394
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
395
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
396 397 398 399 400 401 402 403 404 405
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

406
    // handle multiple partition
407 408 409
    taosArrayPush(qa, id);
  }

410
  pAPI->metaReaderFn.clearReader(&mr);
411 412 413
  return qa;
}

414
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
dengyihao's avatar
dengyihao 已提交
415
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
416 417
  const char*    id = GET_TASKID(pTaskInfo);
  int32_t        code = 0;
H
Haojun Liao 已提交
418 419

  if (isAdd) {
420
    qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
H
Haojun Liao 已提交
421 422
  }

423
  // traverse to the stream scanner node to add this table id
424
  SOperatorInfo*   pInfo = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
425
  SStreamScanInfo* pScanInfo = pInfo->info;
426

427
  if (isAdd) {  // add new table id
428
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI);
429
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);
430
    qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
431
    code = pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
432
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
433
      taosArrayDestroy(qa);
434 435 436
      return code;
    }

M
Minglei Jin 已提交
437
    bool   assignUid = false;
L
Liu Jicong 已提交
438 439
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
440
    if (bufLen > 0) {
441
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
442 443
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
444
        taosArrayDestroy(qa);
445 446 447
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
448

449
    STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
450
    taosWLockLatch(&pTaskInfo->lock);
451 452

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
453
      uint64_t*     uid = taosArrayGet(qa, i);
454
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
455 456

      if (bufLen > 0) {
457 458 459
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
460
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
461
                                       &keyInfo.groupId, &pTaskInfo->storageAPI);
462
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
463
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
464
            taosArrayDestroy(qa);
465
            taosWUnLockLatch(&pTaskInfo->lock);
466 467
            return code;
          }
468 469 470
        }
      }

H
Haojun Liao 已提交
471
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
472 473
    }

474
    taosWUnLockLatch(&pTaskInfo->lock);
475 476 477 478
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

479 480
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
481
    qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
482
    taosWLockLatch(&pTaskInfo->lock);
483
    code = pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
484
    taosWUnLockLatch(&pTaskInfo->lock);
485 486
  }

487
  return code;
L
fix  
Liu Jicong 已提交
488
}
489

490
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
491
                                    int32_t* tversion) {
492
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
493
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
494

495
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
496 497 498
    return TSDB_CODE_SUCCESS;
  }

499 500 501 502
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
503 504 505
  } else {
    dbName[0] = 0;
  }
506 507
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
508 509 510
  } else {
    tableName[0] = 0;
  }
511 512

  return 0;
513
}
514 515

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
516
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
517 518 519
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
  taosThreadOnce(&initPoolOnce, initRefPool);

520
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
521

522
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
523
  if (code != TSDB_CODE_SUCCESS) {
524
    qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
525 526 527
    goto _error;
  }

528
  SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
H
Haojun Liao 已提交
529
  code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
530
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
531
    qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
532 533 534 535 536
    goto _error;
  }

  if (handle) {
    void* pSinkParam = NULL;
537
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
538
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
539
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
540 541 542
      goto _error;
    }

H
Haojun Liao 已提交
543
    // pSinkParam has been freed during create sinker.
H
Haojun Liao 已提交
544
    code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
545 546
  }

547
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
548

dengyihao's avatar
dengyihao 已提交
549
_error:
550 551 552 553
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

H
Haojun Liao 已提交
554
static void freeBlock(void* param) {
555
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
556 557 558
  blockDataDestroy(pBlock);
}

559
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
560 561 562
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
563
  if (pLocal) {
564
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
565
  }
L
Liu Jicong 已提交
566

H
Haojun Liao 已提交
567
  taosArrayClear(pResList);
H
Haojun Liao 已提交
568

569 570 571 572 573 574 575 576
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
577
    pTaskInfo->cost.start = taosGetTimestampUs();
578 579 580
  }

  if (isTaskKilled(pTaskInfo)) {
581
    atomic_store_64(&pTaskInfo->owner, 0);
582 583 584 585 586 587 588 589 590
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
591

592 593 594 595 596 597 598 599
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

600
  int32_t      current = 0;
H
Haojun Liao 已提交
601 602
  SSDataBlock* pRes = NULL;

603 604
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
605
  int32_t blockIndex = 0;
606
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
607 608 609 610 611 612
    SSDataBlock* p = NULL;
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
      SSDataBlock* p1 = createOneDataBlock(pRes, true);
      taosArrayPush(pTaskInfo->pResultBlockList, &p1);
      p = p1;
    } else {
L
Liu Jicong 已提交
613
      p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
H
Haojun Liao 已提交
614 615 616 617 618
      copyDataBlock(p, pRes);
    }

    blockIndex += 1;

H
Haojun Liao 已提交
619 620 621 622 623 624 625 626 627
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

628
  *hasMore = (pRes != NULL);
629 630 631
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
632
  if (NULL == pRes) {
633 634 635 636 637
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
638
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
639
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
640
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
641 642 643 644 645

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

H
Haojun Liao 已提交
646 647
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
648 649 650
  SArray*        pList = pTaskInfo->pResultBlockList;
  size_t         num = taosArrayGetSize(pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
651 652 653 654 655 656 657
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
    blockDataDestroy(*p);
  }

  taosArrayClear(pTaskInfo->pResultBlockList);
}

658 659 660 661 662 663 664 665 666 667 668 669 670
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
671
    pTaskInfo->cost.start = taosGetTimestampUs();
672 673
  }

5
54liuyao 已提交
674
  if (isTaskKilled(pTaskInfo)) {
675
    clearStreamBlock(pTaskInfo->pRoot);
676
    atomic_store_64(&pTaskInfo->owner, 0);
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

L
Liu Jicong 已提交
715
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

  return TSDB_CODE_SUCCESS;
}

int32_t stopInfoComp(void const* lp, void const* rp) {
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;

  if (key->refId < pInfo->refId) {
    return -1;
  } else if (key->refId > pInfo->refId) {
    return 1;
  }

  return 0;
}

L
Liu Jicong 已提交
736
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
737 738 739 740 741 742 743 744 745 746 747 748 749
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
  if (idx >= 0) {
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
  }
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
  taosWLockLatch(&pTaskInfo->stopInfo.lock);

  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
  for (int32_t i = 0; i < num; ++i) {
L
Liu Jicong 已提交
750 751
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
    SExchangeInfo*       pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
D
dapan1121 已提交
752 753 754 755 756 757 758 759 760
    if (pExchangeInfo) {
      tsem_post(&pExchangeInfo->ready);
      taosReleaseRef(exchangeObjRefPool, pStop->refId);
    }
  }

  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

D
dapan1121 已提交
761
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
762 763 764 765 766 767
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
768

D
dapan1121 已提交
769
  setTaskKilled(pTaskInfo, rspCode);
D
dapan1121 已提交
770
  qStopTaskOperators(pTaskInfo);
L
Liu Jicong 已提交
771

772 773 774
  return TSDB_CODE_SUCCESS;
}

775 776 777 778 779 780
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
781 782
  qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
783

X
Xiaoyu Wang 已提交
784
  while (qTaskIsExecuting(pTaskInfo)) {
785 786 787 788 789 790 791
    taosMsleep(10);
  }

  pTaskInfo->code = rspCode;
  return TSDB_CODE_SUCCESS;
}

792 793 794 795 796 797 798 799 800
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (NULL == pTaskInfo) {
    return false;
  }

  return 0 != atomic_load_64(&pTaskInfo->owner);
}

H
Haojun Liao 已提交
801 802
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
803
  int64_t        idleTime = pSummary->start - pSummary->created;
H
Haojun Liao 已提交
804 805 806 807

  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
    qDebug(
808
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
809
        "createGroupIdMap:%.2f ms, total blocks:%d, "
H
Haojun Liao 已提交
810
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
811 812 813 814 815 816
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
        pRecorder->totalRows, pRecorder->totalCheckedRows);
  } else {
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
           pSummary->elapsedTime / 1000.0);
H
Haojun Liao 已提交
817 818 819
  }
}

820 821 822 823 824 825
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

H
Haojun Liao 已提交
826 827 828 829 830
  if (pTaskInfo->pRoot != NULL) {
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
  } else {
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
  }
831

H
Haojun Liao 已提交
832
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
833 834 835
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
836
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
837
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
838
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
839 840 841 842 843 844 845 846 847
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
L
Liu Jicong 已提交
848 849 850 851 852
  //  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  //  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
  //    taosMemoryFreeClear(*pOutput);
  //    *len = 0;
  //  }
H
Haojun Liao 已提交
853
  return 0;
854 855 856 857 858 859 860 861 862
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
863
  return 0;
L
Liu Jicong 已提交
864
  //  return decodeOperator(pTaskInfo->pRoot, pInput, len);
865 866 867 868 869 870 871
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
872
    uint16_t type = pOperator->operatorType;
873 874 875 876 877 878 879 880 881 882
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

L
liuyao 已提交
883
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
884 885
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
886 887 888 889 890 891

  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;

  pStreamInfo->fillHistoryVer = *pVerRange;
  pStreamInfo->fillHistoryWindow = *pWindow;
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
L
liuyao 已提交
892 893
  pStreamInfo->recoverStep1Finished = false;
  pStreamInfo->recoverStep2Finished = false;
894

895
  qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
L
liuyao 已提交
896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
         " - %" PRId64,
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
         pWindow->ekey);
  return 0;
}

int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);

  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;

  pStreamInfo->fillHistoryVer = *pVerRange;
  pStreamInfo->fillHistoryWindow = *pWindow;
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
  pStreamInfo->recoverStep1Finished = true;
  pStreamInfo->recoverStep2Finished = false;

914
  qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
915
         ", window:%" PRId64 " - %" PRId64,
916 917
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
         pWindow->ekey);
918 919 920 921 922 923 924 925 926 927
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

928
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
929 930 931 932
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
933 934 935
    int32_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
936
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
937
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;
L
Liu Jicong 已提交
938

939 940
      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
941

942 943 944 945 946 947
      qInfo("save stream param for interval: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);

      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
948 949
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
950 951 952
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
953
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
954 955 956 957
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;

      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
958

959
      qInfo("save stream param for session: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
L
Liu Jicong 已提交
960

961 962 963 964
      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
965 966
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
967
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
968
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
969 970 971 972
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;

      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
973

974
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
L
Liu Jicong 已提交
975

976 977 978 979
      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
980 981
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

1000
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
1001
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1002
  const char*    id = GET_TASKID(pTaskInfo);
1003 1004 1005
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
1006 1007 1008
    uint16_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
1009
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1010 1011
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1012
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1013 1014 1015 1016 1017
      qInfo("%s restore stream agg executors param for interval: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
1018
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1019 1020
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1021
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1022 1023 1024
      qInfo("%s restore stream agg executor param for session: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
1025
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1026 1027
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1028
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1029 1030
      qInfo("%s restore stream agg executor param for state: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
1044
}
1045

L
Liu Jicong 已提交
1046 1047 1048 1049
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverScanFinished;
}
1050

L
liuyao 已提交
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverStep1Finished;
}

bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverStep2Finished;
}

int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  pTaskInfo->streamInfo.recoverStep1Finished = true;
  pTaskInfo->streamInfo.recoverStep2Finished = true;
1065 1066 1067

  // reset the time window
  pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN;
L
liuyao 已提交
1068 1069 1070
  return 0;
}

1071 1072 1073 1074 1075
void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
1076 1077 1078 1079 1080 1081 1082 1083
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
1084 1085
}

wmmhello's avatar
wmmhello 已提交
1086
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
1087
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
wmmhello's avatar
wmmhello 已提交
1088
  return &pTaskInfo->streamInfo.metaRsp;
1089 1090
}

1091
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
wmmhello's avatar
wmmhello 已提交
1092
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1093
  memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
1094 1095
}

H
Haojun Liao 已提交
1096
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
1097 1098
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1099
  pCond->numOfCols = pMtInfo->schema->nCols;
1100
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
H
Haojun Liao 已提交
1101 1102 1103 1104
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
    taosMemoryFreeClear(pCond->colList);
    taosMemoryFreeClear(pCond->pSlotList);
S
Shengliang Guan 已提交
1105
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1106 1107 1108
    return terrno;
  }

H
Haojun Liao 已提交
1109
  pCond->twindows = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
1110
  pCond->suid = pMtInfo->suid;
1111 1112 1113 1114 1115
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
1116 1117 1118 1119 1120 1121
    SColumnInfo* pColInfo = &pCond->colList[i];
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;

    pCond->pSlotList[i] = i;
1122 1123 1124 1125 1126
  }

  return TSDB_CODE_SUCCESS;
}

1127 1128
void qStreamSetOpen(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
X
Xiaoyu Wang 已提交
1129
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
1130 1131 1132
  pOperator->status = OP_NOT_OPENED;
}

1133
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
1134
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1135 1136
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

1137
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
X
Xiaoyu Wang 已提交
1138
  const char*    id = GET_TASKID(pTaskInfo);
1139

1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
  if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
    pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
    if (pOperator == NULL) {
      return -1;
    }
    SStreamScanInfo* pInfo = pOperator->info;
    SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
    SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
    walReaderVerifyOffset(pWalReader, pOffset);
  }
wmmhello's avatar
wmmhello 已提交
1150
  // if pOffset equal to current offset, means continue consume
1151
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
1152 1153
    return 0;
  }
1154

1155
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
1156
    pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
wmmhello's avatar
wmmhello 已提交
1157 1158
    if (pOperator == NULL) {
      return -1;
1159
    }
1160

1161
    SStreamScanInfo* pInfo = pOperator->info;
1162 1163
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
1164
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
1165

1166
    if (pOffset->type == TMQ_OFFSET__LOG) {
1167
      // todo refactor: move away
1168
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
1169 1170
      pScanBaseInfo->dataReader = NULL;

H
Haojun Liao 已提交
1171 1172 1173 1174 1175 1176 1177
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
      SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
      walReaderVerifyOffset(pWalReader, pOffset);
      if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version + 1, id) < 0) {
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
        return -1;
      }
1178
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1179 1180
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1181 1182
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;
1183
      int32_t index = 0;
1184

1185 1186 1187 1188
      // this value may be changed if new tables are created
      taosRLockLatch(&pTaskInfo->lock);
      int32_t numOfTables = tableListGetSize(pTableListInfo);

1189
      if (uid == 0) {
1190 1191
        if (numOfTables != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
1192 1193
          uid = pTableInfo->uid;
          ts = INT64_MIN;
1194
          pScanInfo->currentTable = 0;
1195
        } else {
1196 1197
          taosRUnLockLatch(&pTaskInfo->lock);
          qError("no table in table list, %s", id);
1198
          terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
L
Liu Jicong 已提交
1199 1200
          return -1;
        }
1201
      }
H
Haojun Liao 已提交
1202

X
Xiaoyu Wang 已提交
1203 1204
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
             pInfo->pTableScanOp->resultInfo.totalRows);
1205
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
1206

1207
      // start from current accessed position
H
Haojun Liao 已提交
1208 1209 1210
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
      // position, let's find it from the beginning.
      index = tableListFind(pTableListInfo, uid, 0);
1211
      taosRUnLockLatch(&pTaskInfo->lock);
1212

1213 1214 1215
      if (index >= 0) {
        pScanInfo->currentTable = index;
      } else {
H
Haojun Liao 已提交
1216 1217
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
               numOfTables, pScanInfo->currentTable, id);
1218
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
wmmhello's avatar
wmmhello 已提交
1219 1220
        return -1;
      }
1221

1222
      STableKeyInfo keyInfo = {.uid = uid};
X
Xiaoyu Wang 已提交
1223
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
1224 1225 1226

      // let's start from the next ts that returned to consumer.
      pScanBaseInfo->cond.twindows.skey = ts + 1;
H
Haojun Liao 已提交
1227
      pScanInfo->scanTimes = 0;
1228

1229
      if (pScanBaseInfo->dataReader == NULL) {
1230
        int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
1231
                                      pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL);
1232 1233 1234
        if (code != TSDB_CODE_SUCCESS) {
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
          terrno = code;
wmmhello's avatar
wmmhello 已提交
1235
          return -1;
L
Liu Jicong 已提交
1236
        }
1237

H
Haojun Liao 已提交
1238 1239
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1240
      } else {
1241 1242
        pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
        pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
1243
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
H
Haojun Liao 已提交
1244
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1245
      }
1246 1247 1248

      // restore the key value
      pScanBaseInfo->cond.twindows.skey = oldSkey;
1249
    } else {
1250
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
1251
      terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1252 1253 1254
      return -1;
    }

1255
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1256

1257 1258 1259
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      SStreamRawScanInfo* pInfo = pOperator->info;
      SSnapContext*       sContext = pInfo->sContext;
1260

X
Xiaoyu Wang 已提交
1261
      SOperatorInfo*  p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
1262 1263
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;

1264
      if (pAPI->snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
X
Xiaoyu Wang 已提交
1265
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
1266
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1267 1268
        return -1;
      }
H
Haojun Liao 已提交
1269

H
Haojun Liao 已提交
1270
      SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext);
1271
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
1272
      pInfo->dataReader = NULL;
H
Haojun Liao 已提交
1273

1274 1275
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
      tableListClear(pTableListInfo);
1276

1277
      if (mtInfo.uid == 0) {
1278
        goto end;  // no data
1279
      }
1280

1281 1282
      initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
      pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
H
Haojun Liao 已提交
1283

1284
      tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
1285

1286 1287
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
      int32_t        size = tableListGetSize(pTableListInfo);
1288

1289
      pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL,
1290
                     false, NULL);
L
Liu Jicong 已提交
1291

1292 1293
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
      strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
1294
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
1295 1296
      pTaskInfo->streamInfo.schema = mtInfo.schema;

X
Xiaoyu Wang 已提交
1297
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
1298 1299 1300
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
      SStreamRawScanInfo* pInfo = pOperator->info;
      SSnapContext*       sContext = pInfo->sContext;
1301
      if (pTaskInfo->storageAPI.snapshotFn.createSnapshot(sContext, pOffset->uid) != 0) {
1302
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
1303
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1304 1305
        return -1;
      }
X
Xiaoyu Wang 已提交
1306 1307
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
             id);
1308 1309
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
      SStreamRawScanInfo* pInfo = pOperator->info;
1310
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
1311
      pInfo->dataReader = NULL;
1312
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
wmmhello's avatar
wmmhello 已提交
1313
    }
1314
  }
1315 1316

end:
wmmhello's avatar
wmmhello 已提交
1317
  pTaskInfo->streamInfo.currentOffset = *pOffset;
1318

1319 1320
  return 0;
}
H
Haojun Liao 已提交
1321 1322 1323

void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
X
Xiaoyu Wang 已提交
1324
  if (pMsg->info.ahandle == NULL) {
wmmhello's avatar
wmmhello 已提交
1325 1326 1327
    qError("pMsg->info.ahandle is NULL");
    return;
  }
H
Haojun Liao 已提交
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
L
Liu Jicong 已提交
1344
}
L
Liu Jicong 已提交
1345

1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = tinfo;
  SArray* plist = getTableListInfo(pTaskInfo);

  // only extract table in the first elements
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);

  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));

  int32_t numOfTables = tableListGetSize(pTableListInfo);
  for(int32_t i = 0; i < numOfTables; ++i) {
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
    taosArrayPush(pUidList, &pKeyInfo->uid);
  }

  taosArrayDestroy(plist);
  return pUidList;
}
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384

static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
    taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = pOperator->info;
    taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
  } else {
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
      extractTableList(pList, pOperator->pDownstream[0]);
    }
  }
}

SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
  SArray*        pArray = taosArrayInit(0, POINTER_BYTES);
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  extractTableList(pArray, pOperator);
  return pArray;
L
liuyao 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397
}

int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
  pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
  return 0;
}

int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
  pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
  return 0;
}