executor.c 48.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15 16
 */

#include "executor.h"
17 18
#include <libs/transport/trpc.h>
#include <libs/wal/wal.h>
19 20
#include "executorInt.h"
#include "operator.h"
21
#include "planner.h"
22
#include "querytask.h"
L
Liu Jicong 已提交
23
#include "tdatablock.h"
L
Liu Jicong 已提交
24
#include "tref.h"
25
#include "tudf.h"
26 27

#include "storageapi.h"
28 29 30 31 32 33 34 35

static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t             exchangeObjRefPool = -1;

static void cleanupRefPool() {
  int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
  taosCloseRef(ref);
}
36

X
Xiaoyu Wang 已提交
37 38
static void initRefPool() {
  exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
D
dapan1121 已提交
39 40 41
  atexit(cleanupRefPool);
}

L
Liu Jicong 已提交
42 43 44 45
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
46
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
47 48 49 50
    }

    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
51
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
52 53 54 55 56 57 58 59 60 61
    }
    pOperator->status = OP_NOT_OPENED;
    return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
  } else {
    pOperator->status = OP_NOT_OPENED;

    SStreamScanInfo* pInfo = pOperator->info;

    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
K
kailixu 已提交
62 63
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
        taosArrayPush(pInfo->pBlockLists, pReq);
L
Liu Jicong 已提交
64 65 66 67 68 69 70 71
      }
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
      taosArrayPush(pInfo->pBlockLists, &input);
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
      for (int32_t i = 0; i < numOfBlocks; ++i) {
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
K
kailixu 已提交
72 73 74 75
        SPackedData  tmp = {
             .pDataBlock = pDataBlock,
        };
        taosArrayPush(pInfo->pBlockLists, &tmp);
L
Liu Jicong 已提交
76 77 78 79 80 81 82 83
      }
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
    }

    return TSDB_CODE_SUCCESS;
  }
}

L
Liu Jicong 已提交
84
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
H
Haojun Liao 已提交
85 86 87 88 89
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 0) {
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
    }
L
Liu Jicong 已提交
90

H
Haojun Liao 已提交
91 92 93
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
      qError("join not supported for stream block scan, %s" PRIx64, id);
      return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
94
    }
95

H
Haojun Liao 已提交
96 97
    pOperator->status = OP_NOT_OPENED;
    return doSetStreamOpOpen(pOperator->pDownstream[0], id);
L
Liu Jicong 已提交
98 99 100 101
  }
  return 0;
}

102 103 104 105 106 107 108 109 110 111 112
static void clearStreamBlock(SOperatorInfo* pOperator) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    if (pOperator->numOfDownstream == 1) {
      return clearStreamBlock(pOperator->pDownstream[0]);
    }
  } else {
    SStreamScanInfo* pInfo = pOperator->info;
    doClearBufferedBlocks(pInfo);
  }
}

5
54liuyao 已提交
113 114 115 116 117 118
void resetTaskInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  pTaskInfo->code = 0;
  clearStreamBlock(pTaskInfo->pRoot);
}

119
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
X
Xiaoyu Wang 已提交
120
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
H
Haojun Liao 已提交
121
    if (pOperator->numOfDownstream == 0) {
122
      qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
S
Shengliang Guan 已提交
123
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
124
    }
H
Haojun Liao 已提交
125

H
Haojun Liao 已提交
126
    if (pOperator->numOfDownstream > 1) {  // not handle this in join query
127
      qError("join not supported for stream block scan, %s" PRIx64, id);
S
Shengliang Guan 已提交
128
      return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
129
    }
L
Liu Jicong 已提交
130
    pOperator->status = OP_NOT_OPENED;
L
Liu Jicong 已提交
131
    return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
H
Haojun Liao 已提交
132
  } else {
133
    pOperator->status = OP_NOT_OPENED;
134
    SStreamScanInfo* pInfo = pOperator->info;
135

136
    qDebug("s-task:%s in this batch, %d blocks need to be processed", id, (int32_t)numOfBlocks);
137
    ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
138

L
Liu Jicong 已提交
139 140
    if (type == STREAM_INPUT__MERGED_SUBMIT) {
      for (int32_t i = 0; i < numOfBlocks; i++) {
L
Liu Jicong 已提交
141
        SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
L
Liu Jicong 已提交
142
        taosArrayPush(pInfo->pBlockLists, pReq);
143
      }
144

L
Liu Jicong 已提交
145 146
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
    } else if (type == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
147
      taosArrayPush(pInfo->pBlockLists, input);
L
Liu Jicong 已提交
148
      pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
L
Liu Jicong 已提交
149
    } else if (type == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
150
      for (int32_t i = 0; i < numOfBlocks; ++i) {
L
Liu Jicong 已提交
151
        SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
152
        SPackedData  tmp = { .pDataBlock = pDataBlock };
L
Liu Jicong 已提交
153
        taosArrayPush(pInfo->pBlockLists, &tmp);
H
Haojun Liao 已提交
154
      }
155

L
Liu Jicong 已提交
156
      pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
157 158
    } else {
      ASSERT(0);
159 160
    }

H
Haojun Liao 已提交
161 162 163 164
    return TSDB_CODE_SUCCESS;
  }
}

165
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI *pAPI) {
166 167 168
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pStreamScanInfo = pOperator->info;
H
Haojun Liao 已提交
169 170 171
    if (pStreamScanInfo->pTableScanOp != NULL) {
      STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
      if (pScanInfo->base.dataReader != NULL) {
172
        pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
H
Haojun Liao 已提交
173
      }
174 175
    }
  } else {
176
    doSetTaskId(pOperator->pDownstream[0], pAPI);
177 178 179
  }
}

H
Haojun Liao 已提交
180 181 182
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
  SExecTaskInfo* pTaskInfo = tinfo;
  pTaskInfo->id.queryId = queryId;
H
Haojun Liao 已提交
183
  buildTaskId(taskId, queryId, pTaskInfo->id.str);
184 185

  // set the idstr for tsdbReader
186
  doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
H
Haojun Liao 已提交
187 188
}

L
Liu Jicong 已提交
189 190
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
191
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
192 193 194 195 196 197 198 199 200 201 202 203 204
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetStreamOpOpen(pTaskInfo->pRoot, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}
205

L
liuyao 已提交
206
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
207
  SExecTaskInfo* pTaskInfo = tinfo;
L
liuyao 已提交
208 209
  *dataVer = pTaskInfo->streamInfo.dataVersion;
  *ckId = pTaskInfo->streamInfo.checkPointId;
210 211 212
}


L
Liu Jicong 已提交
213
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
H
Haojun Liao 已提交
214
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
215
    return TSDB_CODE_APP_ERROR;
H
Haojun Liao 已提交
216 217
  }

H
Haojun Liao 已提交
218
  if (pBlocks == NULL || numOfBlocks == 0) {
H
Haojun Liao 已提交
219 220 221
    return TSDB_CODE_SUCCESS;
  }

L
Liu Jicong 已提交
222
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
223

C
Cary Xu 已提交
224
  int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
225
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
226
    qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
227
  } else {
H
Haojun Liao 已提交
228
    qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
229 230 231 232 233
  }

  return code;
}

L
Liu Jicong 已提交
234 235
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
  if (tinfo == NULL) {
S
Shengliang Guan 已提交
236
    return TSDB_CODE_APP_ERROR;
L
Liu Jicong 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
  }

  if (pBlocks == NULL || numOfBlocks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;

  int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
  } else {
    qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
  }

  return code;
}

X
Xiaoyu Wang 已提交
255 256 257
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
                                     uint64_t id) {
  if (msg == NULL) {  // create raw scan
258
    SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api);
259 260 261 262
    if (NULL == pTaskInfo) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
263

264
    pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
L
Liu Jicong 已提交
265
    if (NULL == pTaskInfo->pRoot) {
266 267 268 269
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      taosMemoryFree(pTaskInfo);
      return NULL;
    }
270

271
    pTaskInfo->storageAPI = pReaderHandle->api;
272
    qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
273
    return pTaskInfo;
L
Liu Jicong 已提交
274 275
  }

H
Haojun Liao 已提交
276 277
  SSubplan* pPlan = NULL;
  int32_t   code = qStringToSubplan(msg, &pPlan);
L
Liu Jicong 已提交
278 279 280 281 282 283
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
284
  code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
L
Liu Jicong 已提交
285
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
286 287
    nodesDestroyNode((SNode*)pPlan);
    qDestroyTask(pTaskInfo);
L
Liu Jicong 已提交
288 289 290 291
    terrno = code;
    return NULL;
  }

292
  // extract the number of output columns
H
Haojun Liao 已提交
293
  SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
wmmhello's avatar
wmmhello 已提交
294
  *numOfCols = 0;
295

L
Liu Jicong 已提交
296
  SNode* pNode;
297 298 299
  FOREACH(pNode, pDescNode->pSlots) {
    SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
    if (pSlotDesc->output) {
wmmhello's avatar
wmmhello 已提交
300
      ++(*numOfCols);
301 302 303
    }
  }

L
Liu Jicong 已提交
304 305 306
  return pTaskInfo;
}

307
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
L
Liu Jicong 已提交
308
  if (msg == NULL) {
309 310 311
    return NULL;
  }

312 313
  SSubplan* pPlan = NULL;
  int32_t   code = qStringToSubplan(msg, &pPlan);
314 315 316 317 318 319
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  qTaskInfo_t pTaskInfo = NULL;
320
  code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
321
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
322
    nodesDestroyNode((SNode*)pPlan);
H
Haojun Liao 已提交
323
    qDestroyTask(pTaskInfo);
324 325 326 327
    terrno = code;
    return NULL;
  }

328
  qStreamInfoResetTimewindowFilter(pTaskInfo);
329 330
  return pTaskInfo;
}
331

332 333
static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
    SStorageAPI* pAPI) {
334
  SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
H
Haojun Liao 已提交
335 336 337 338
  int32_t numOfUids = taosArrayGetSize(tableIdList);
  if (numOfUids == 0) {
    return qa;
  }
339

340 341
  STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;

342
  uint64_t suid = 0;
343
  uint64_t uid = 0;
344
  int32_t type = 0;
345
  tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
346

347 348
  // let's discard the tables those are not created according to the queried super table.
  SMetaReader mr = {0};
H
Haojun Liao 已提交
349
  pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, 0, &pAPI->metaFn);
H
Haojun Liao 已提交
350
  for (int32_t i = 0; i < numOfUids; ++i) {
351
    uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
352

353
    int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
354
    if (code != TSDB_CODE_SUCCESS) {
355
      qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
356 357 358
      continue;
    }

M
Minglei Jin 已提交
359 360
    tDecoderClear(&mr.coder);

361
    if (mr.me.type == TSDB_SUPER_TABLE) {
362
      continue;
363 364 365
    } else {
      if (type == TSDB_SUPER_TABLE) {
        // this new created child table does not belong to the scanned super table.
366
        if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
367 368 369 370 371
          continue;
        }
      } else {  // ordinary table
        // In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
        // should check all newly created ordinary table to make sure that this table isn't the destination table.
372
        if (mr.me.uid != uid) {
373 374 375
          continue;
        }
      }
376
    }
377 378 379

    if (pScanInfo->pTagCond != NULL) {
      bool          qualified = false;
380
      STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
381
      code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
382 383 384 385 386 387 388 389 390 391
      if (code != TSDB_CODE_SUCCESS) {
        qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
        continue;
      }

      if (!qualified) {
        continue;
      }
    }

392
    // handle multiple partition
393 394 395
    taosArrayPush(qa, id);
  }

396
  pAPI->metaReaderFn.clearReader(&mr);
397 398 399
  return qa;
}

400
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
dengyihao's avatar
dengyihao 已提交
401
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
402 403
  const char*    id = GET_TASKID(pTaskInfo);
  int32_t        code = 0;
H
Haojun Liao 已提交
404 405

  if (isAdd) {
406
    qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
H
Haojun Liao 已提交
407 408
  }

409
  // traverse to the stream scanner node to add this table id
410
  SOperatorInfo*   pInfo = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
411
  SStreamScanInfo* pScanInfo = pInfo->info;
412

413
  if (isAdd) {  // add new table id
414
    SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI);
415
    int32_t numOfQualifiedTables = taosArrayGetSize(qa);
416
    qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
417
    code = pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
418
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
419
      taosArrayDestroy(qa);
420 421 422
      return code;
    }

M
Minglei Jin 已提交
423
    bool   assignUid = false;
L
Liu Jicong 已提交
424 425
    size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
    char*  keyBuf = NULL;
426
    if (bufLen > 0) {
427
      assignUid = groupbyTbname(pScanInfo->pGroupTags);
428 429
      keyBuf = taosMemoryMalloc(bufLen);
      if (keyBuf == NULL) {
H
Haojun Liao 已提交
430
        taosArrayDestroy(qa);
431 432 433
        return TSDB_CODE_OUT_OF_MEMORY;
      }
    }
434

435
    STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
436
    taosWLockLatch(&pTaskInfo->lock);
437 438

    for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
439
      uint64_t*     uid = taosArrayGet(qa, i);
440
      STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
441 442

      if (bufLen > 0) {
443 444 445
        if (assignUid) {
          keyInfo.groupId = keyInfo.uid;
        } else {
446
          code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
447
                                       &keyInfo.groupId, &pTaskInfo->storageAPI);
448
          if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
449
            taosMemoryFree(keyBuf);
H
Haojun Liao 已提交
450
            taosArrayDestroy(qa);
451
            taosWUnLockLatch(&pTaskInfo->lock);
452 453
            return code;
          }
454 455 456
        }
      }

H
Haojun Liao 已提交
457
      tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
458 459
    }

460
    taosWUnLockLatch(&pTaskInfo->lock);
461 462 463 464
    if (keyBuf != NULL) {
      taosMemoryFree(keyBuf);
    }

465 466
    taosArrayDestroy(qa);
  } else {  // remove the table id in current list
467
    qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
468
    taosWLockLatch(&pTaskInfo->lock);
469
    code = pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
470
    taosWUnLockLatch(&pTaskInfo->lock);
471 472
  }

473
  return code;
L
fix  
Liu Jicong 已提交
474
}
475

476
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
L
Liu Jicong 已提交
477
                                    int32_t* tversion) {
478
  ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
479
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
480

481
  if (pTaskInfo->schemaInfo.sw == NULL) {
H
Haojun Liao 已提交
482 483 484
    return TSDB_CODE_SUCCESS;
  }

485 486 487 488
  *sversion = pTaskInfo->schemaInfo.sw->version;
  *tversion = pTaskInfo->schemaInfo.tversion;
  if (pTaskInfo->schemaInfo.dbname) {
    strcpy(dbName, pTaskInfo->schemaInfo.dbname);
489 490 491
  } else {
    dbName[0] = 0;
  }
492 493
  if (pTaskInfo->schemaInfo.tablename) {
    strcpy(tableName, pTaskInfo->schemaInfo.tablename);
494 495 496
  } else {
    tableName[0] = 0;
  }
497 498

  return 0;
499
}
500 501

int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
D
dapan1121 已提交
502
                        qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
503 504 505
  SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
  taosThreadOnce(&initPoolOnce, initRefPool);

506
  qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
507

508
  int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
509
  if (code != TSDB_CODE_SUCCESS) {
510
    qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
511 512 513 514
    goto _error;
  }

  if (handle) {
D
dapan1121 已提交
515 516 517 518 519 520 521 522
    SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
    void* pSinkManager = NULL;
    code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
    if (code != TSDB_CODE_SUCCESS) {
      qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
      goto _error;
    }

523
    void* pSinkParam = NULL;
524
    code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
525
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
526
      qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
D
dapan1121 已提交
527
      taosMemoryFree(pSinkManager);
528 529 530
      goto _error;
    }

H
Haojun Liao 已提交
531
    // pSinkParam has been freed during create sinker.
D
dapan1121 已提交
532
    code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
533 534
  }

535
  qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
536

dengyihao's avatar
dengyihao 已提交
537
_error:
538 539 540 541
  // if failed to add ref for all tables in this query, abort current query
  return code;
}

H
Haojun Liao 已提交
542
static void freeBlock(void* param) {
543
  SSDataBlock* pBlock = *(SSDataBlock**)param;
H
Haojun Liao 已提交
544 545 546
  blockDataDestroy(pBlock);
}

547
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal) {
548 549 550
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

D
dapan1121 已提交
551
  if (pLocal) {
552
    memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
D
dapan1121 已提交
553
  }
L
Liu Jicong 已提交
554

H
Haojun Liao 已提交
555
  taosArrayClear(pResList);
H
Haojun Liao 已提交
556

557 558 559 560 561 562 563 564
  int64_t curOwner = 0;
  if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
    return pTaskInfo->code;
  }

  if (pTaskInfo->cost.start == 0) {
565
    pTaskInfo->cost.start = taosGetTimestampUs();
566 567 568
  }

  if (isTaskKilled(pTaskInfo)) {
569
    atomic_store_64(&pTaskInfo->owner, 0);
570 571 572 573 574 575 576 577 578
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
    return TSDB_CODE_SUCCESS;
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
H
Haojun Liao 已提交
579

580 581 582 583 584 585 586 587
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);

    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

588
  int32_t      current = 0;
H
Haojun Liao 已提交
589 590
  SSDataBlock* pRes = NULL;

591 592
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
593
  int32_t blockIndex = 0;
594
  while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
H
Haojun Liao 已提交
595 596 597 598 599 600
    SSDataBlock* p = NULL;
    if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
      SSDataBlock* p1 = createOneDataBlock(pRes, true);
      taosArrayPush(pTaskInfo->pResultBlockList, &p1);
      p = p1;
    } else {
L
Liu Jicong 已提交
601
      p = *(SSDataBlock**)taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
H
Haojun Liao 已提交
602 603 604 605 606
      copyDataBlock(p, pRes);
    }

    blockIndex += 1;

H
Haojun Liao 已提交
607 608 609 610 611 612 613 614 615
    current += p->info.rows;
    ASSERT(p->info.rows > 0);
    taosArrayPush(pResList, &p);

    if (current >= 4096) {
      break;
    }
  }

616
  *hasMore = (pRes != NULL);
617 618 619
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
H
Haojun Liao 已提交
620
  if (NULL == pRes) {
621 622 623 624 625
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

H
Haojun Liao 已提交
626
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
H
Haojun Liao 已提交
627
  qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
628
         GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
629 630 631 632 633

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

H
Haojun Liao 已提交
634 635
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
L
Liu Jicong 已提交
636 637 638
  SArray*        pList = pTaskInfo->pResultBlockList;
  size_t         num = taosArrayGetSize(pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
639 640 641 642 643 644 645
    SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
    blockDataDestroy(*p);
  }

  taosArrayClear(pTaskInfo->pResultBlockList);
}

646 647 648 649 650 651
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  int64_t        threadId = taosGetSelfPthreadId();

  *pRes = NULL;
  int64_t curOwner = 0;
652 653 654 655 656 657 658 659 660 661 662 663 664

  // todo extract method
  taosRLockLatch(&pTaskInfo->lock);
  bool isKilled = isTaskKilled(pTaskInfo);
  if (isKilled) {
    clearStreamBlock(pTaskInfo->pRoot);
    qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));

    taosRUnLockLatch(&pTaskInfo->lock);
    return TSDB_CODE_SUCCESS;
  }

  if (pTaskInfo->owner != 0) {
665 666
    qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
    pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
667 668

    taosRUnLockLatch(&pTaskInfo->lock);
669 670 671
    return pTaskInfo->code;
  }

672 673 674
  pTaskInfo->owner = threadId;
  taosRUnLockLatch(&pTaskInfo->lock);

675
  if (pTaskInfo->cost.start == 0) {
676
    pTaskInfo->cost.start = taosGetTimestampUs();
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712
  }

  // error occurs, record the error code and return to client
  int32_t ret = setjmp(pTaskInfo->env);
  if (ret != TSDB_CODE_SUCCESS) {
    pTaskInfo->code = ret;
    cleanUpUdfs();
    qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
    atomic_store_64(&pTaskInfo->owner, 0);
    return pTaskInfo->code;
  }

  qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));

  int64_t st = taosGetTimestampUs();

  *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
  uint64_t el = (taosGetTimestampUs() - st);

  pTaskInfo->cost.elapsedTime += el;
  if (NULL == *pRes) {
    *useconds = pTaskInfo->cost.elapsedTime;
  }

  cleanUpUdfs();

  int32_t  current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
  uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;

  qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
         GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);

  atomic_store_64(&pTaskInfo->owner, 0);
  return pTaskInfo->code;
}

L
Liu Jicong 已提交
713
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);

  return TSDB_CODE_SUCCESS;
}

int32_t stopInfoComp(void const* lp, void const* rp) {
  SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
  SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;

  if (key->refId < pInfo->refId) {
    return -1;
  } else if (key->refId > pInfo->refId) {
    return 1;
  }

  return 0;
}

L
Liu Jicong 已提交
734
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
D
dapan1121 已提交
735 736 737 738 739 740 741 742 743 744 745 746 747
  taosWLockLatch(&pTaskInfo->stopInfo.lock);
  int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
  if (idx >= 0) {
    taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
  }
  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
  taosWLockLatch(&pTaskInfo->stopInfo.lock);

  int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
  for (int32_t i = 0; i < num; ++i) {
L
Liu Jicong 已提交
748 749
    SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
    SExchangeInfo*       pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
D
dapan1121 已提交
750 751 752 753 754 755 756 757 758
    if (pExchangeInfo) {
      tsem_post(&pExchangeInfo->ready);
      taosReleaseRef(exchangeObjRefPool, pStop->refId);
    }
  }

  taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}

D
dapan1121 已提交
759
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
760 761 762 763 764 765
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

  qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
L
Liu Jicong 已提交
766

D
dapan1121 已提交
767
  setTaskKilled(pTaskInfo, rspCode);
D
dapan1121 已提交
768
  qStopTaskOperators(pTaskInfo);
L
Liu Jicong 已提交
769

770 771 772
  return TSDB_CODE_SUCCESS;
}

773 774 775 776 777 778
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  if (pTaskInfo == NULL) {
    return TSDB_CODE_QRY_INVALID_QHANDLE;
  }

H
Haojun Liao 已提交
779 780
  qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
  setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
781

782
  taosWLockLatch(&pTaskInfo->lock);
X
Xiaoyu Wang 已提交
783
  while (qTaskIsExecuting(pTaskInfo)) {
784 785 786
    taosMsleep(10);
  }
  pTaskInfo->code = rspCode;
787 788
  taosWUnLockLatch(&pTaskInfo->lock);

789 790 791
  return TSDB_CODE_SUCCESS;
}

792 793 794 795 796 797 798 799 800
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
  if (NULL == pTaskInfo) {
    return false;
  }

  return 0 != atomic_load_64(&pTaskInfo->owner);
}

H
Haojun Liao 已提交
801 802
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
  STaskCostInfo* pSummary = &pTaskInfo->cost;
803
  int64_t        idleTime = pSummary->start - pSummary->created;
H
Haojun Liao 已提交
804 805 806 807

  SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
  if (pSummary->pRecoder != NULL) {
    qDebug(
808
        "%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
809
        "createGroupIdMap:%.2f ms, total blocks:%d, "
H
Haojun Liao 已提交
810
        "load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
811 812 813 814 815 816
        GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
        pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
        pRecorder->totalRows, pRecorder->totalCheckedRows);
  } else {
    qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
           pSummary->elapsedTime / 1000.0);
H
Haojun Liao 已提交
817 818 819
  }
}

820 821 822 823 824 825
void qDestroyTask(qTaskInfo_t qTaskHandle) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
  if (pTaskInfo == NULL) {
    return;
  }

H
Haojun Liao 已提交
826 827 828 829 830
  if (pTaskInfo->pRoot != NULL) {
    qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
  } else {
    qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
  }
831

H
Haojun Liao 已提交
832
  printTaskExecCostInLog(pTaskInfo);  // print the query cost summary
833 834 835
  doDestroyTask(pTaskInfo);
}

H
Haojun Liao 已提交
836
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
837
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
H
Haojun Liao 已提交
838
  return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
839 840 841 842 843 844 845 846 847
}

int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
  if (pTaskInfo->pRoot == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  int32_t nOptrWithVal = 0;
L
Liu Jicong 已提交
848 849 850 851 852
  //  int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
  //  if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
  //    taosMemoryFreeClear(*pOutput);
  //    *len = 0;
  //  }
H
Haojun Liao 已提交
853
  return 0;
854 855 856 857 858 859 860 861 862
}

int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
  SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;

  if (pTaskInfo == NULL || pInput == NULL || len == 0) {
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
863
  return 0;
L
Liu Jicong 已提交
864
  //  return decodeOperator(pTaskInfo->pRoot, pInput, len);
865 866 867 868 869 870 871
}

int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
872
    uint16_t type = pOperator->operatorType;
873 874 875 876 877 878 879 880 881 882
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
      *scanner = pOperator->info;
      return 0;
    } else {
      ASSERT(pOperator->numOfDownstream == 1);
      pOperator = pOperator->pDownstream[0];
    }
  }
}

L
liuyao 已提交
883
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
884 885
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
886 887 888 889 890 891 892

  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;

  pStreamInfo->fillHistoryVer = *pVerRange;
  pStreamInfo->fillHistoryWindow = *pWindow;
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;

893
  qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
L
liuyao 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909
         " - %" PRId64,
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
         pWindow->ekey);
  return 0;
}

int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);

  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;

  pStreamInfo->fillHistoryVer = *pVerRange;
  pStreamInfo->fillHistoryWindow = *pWindow;
  pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;

910
  qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
911
         ", window:%" PRId64 " - %" PRId64,
912 913
         GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
         pWindow->ekey);
914 915 916 917 918 919 920 921 922 923
  return 0;
}

int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
  pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
  return 0;
}

924
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
925 926 927 928
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
929 930 931
    int32_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
932
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
933
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;
L
Liu Jicong 已提交
934

935 936
      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
937

938 939 940 941 942 943
      qInfo("save stream param for interval: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);

      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
944 945
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
946 947 948
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
949
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
950 951 952 953
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;

      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
954

955
      qInfo("save stream param for session: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
L
Liu Jicong 已提交
956

957 958 959 960
      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
961 962
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
963
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
964
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
965 966 967 968
      STimeWindowAggSupp* pSup = &pInfo->twAggSup;

      ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
      ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0);
L
Liu Jicong 已提交
969

970
      qInfo("save stream param for state: %d,  %" PRId64, pSup->calTrigger, pSup->deleteMark);
L
Liu Jicong 已提交
971

972 973 974 975
      pSup->calTriggerSaved = pSup->calTrigger;
      pSup->deleteMarkSaved = pSup->deleteMark;
      pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;
      pSup->deleteMark = INT64_MAX;
976 977
      pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
      pInfo->ignoreExpiredData = false;
978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        ASSERT(0);
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }

  return 0;
}

996
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
997
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
998
  const char*    id = GET_TASKID(pTaskInfo);
999 1000 1001
  SOperatorInfo* pOperator = pTaskInfo->pRoot;

  while (1) {
1002 1003 1004
    uint16_t type = pOperator->operatorType;
    if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
        type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
1005
      SStreamIntervalOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1006 1007
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1008
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1009 1010 1011 1012 1013
      qInfo("%s restore stream agg executors param for interval: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
               type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
1014
      SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1015 1016
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1017
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1018 1019 1020
      qInfo("%s restore stream agg executor param for session: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
    } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
1021
      SStreamStateAggOperatorInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
1022 1023
      pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
      pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
1024
      pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
1025 1026
      qInfo("%s restore stream agg executor param for state: %d,  %" PRId64, id, pInfo->twAggSup.calTrigger,
            pInfo->twAggSup.deleteMark);
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
    }

    // iterate operator tree
    if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
      if (pOperator->numOfDownstream > 1) {
        qError("unexpected stream, multiple downstream");
        return -1;
      }
      return 0;
    } else {
      pOperator = pOperator->pDownstream[0];
    }
  }
1040
}
1041

L
Liu Jicong 已提交
1042 1043 1044 1045
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.recoverScanFinished;
}
1046

1047
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
L
liuyao 已提交
1048
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1049
  STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
L
liuyao 已提交
1050

1051
  qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
1052
         GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
1053

1054 1055
  pWindow->skey = INT64_MIN;
  pWindow->ekey = INT64_MAX;
L
liuyao 已提交
1056 1057 1058
  return 0;
}

1059 1060 1061 1062 1063
void* qExtractReaderFromStreamScanner(void* scanner) {
  SStreamScanInfo* pInfo = scanner;
  return (void*)pInfo->tqReader;
}

wmmhello's avatar
wmmhello 已提交
1064 1065 1066 1067 1068 1069 1070 1071
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.schema;
}

const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
  return pTaskInfo->streamInfo.tbName;
1072 1073
}

wmmhello's avatar
wmmhello 已提交
1074
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
1075
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
wmmhello's avatar
wmmhello 已提交
1076
  return &pTaskInfo->streamInfo.metaRsp;
1077 1078
}

1079
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
wmmhello's avatar
wmmhello 已提交
1080
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1081
  memcpy(pOffset, &pTaskInfo->streamInfo.currentOffset, sizeof(STqOffsetVal));
1082 1083
}

H
Haojun Liao 已提交
1084
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
1085 1086
  memset(pCond, 0, sizeof(SQueryTableDataCond));
  pCond->order = TSDB_ORDER_ASC;
H
Haojun Liao 已提交
1087
  pCond->numOfCols = pMtInfo->schema->nCols;
1088
  pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
H
Haojun Liao 已提交
1089 1090 1091 1092
  pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
  if (pCond->colList == NULL || pCond->pSlotList == NULL) {
    taosMemoryFreeClear(pCond->colList);
    taosMemoryFreeClear(pCond->pSlotList);
S
Shengliang Guan 已提交
1093
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1094 1095 1096
    return terrno;
  }

H
Haojun Liao 已提交
1097
  pCond->twindows = TSWINDOW_INITIALIZER;
H
Haojun Liao 已提交
1098
  pCond->suid = pMtInfo->suid;
1099 1100 1101 1102 1103
  pCond->type = TIMEWINDOW_RANGE_CONTAINED;
  pCond->startVersion = -1;
  pCond->endVersion = sContext->snapVersion;

  for (int32_t i = 0; i < pCond->numOfCols; ++i) {
H
Haojun Liao 已提交
1104 1105 1106 1107 1108 1109
    SColumnInfo* pColInfo = &pCond->colList[i];
    pColInfo->type = pMtInfo->schema->pSchema[i].type;
    pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
    pColInfo->colId = pMtInfo->schema->pSchema[i].colId;

    pCond->pSlotList[i] = i;
1110 1111 1112 1113 1114
  }

  return TSDB_CODE_SUCCESS;
}

1115 1116
void qStreamSetOpen(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
X
Xiaoyu Wang 已提交
1117
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
1118 1119 1120
  pOperator->status = OP_NOT_OPENED;
}

1121
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
1122
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
1123 1124
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

1125
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
X
Xiaoyu Wang 已提交
1126
  const char*    id = GET_TASKID(pTaskInfo);
1127

1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
  if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
    pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
    if (pOperator == NULL) {
      return -1;
    }
    SStreamScanInfo* pInfo = pOperator->info;
    SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
    SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
    walReaderVerifyOffset(pWalReader, pOffset);
  }
wmmhello's avatar
wmmhello 已提交
1138
  // if pOffset equal to current offset, means continue consume
1139
  if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
1140 1141
    return 0;
  }
1142

1143
  if (subType == TOPIC_SUB_TYPE__COLUMN) {
1144
    pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
wmmhello's avatar
wmmhello 已提交
1145 1146
    if (pOperator == NULL) {
      return -1;
1147
    }
1148

1149
    SStreamScanInfo* pInfo = pOperator->info;
1150 1151
    STableScanInfo*  pScanInfo = pInfo->pTableScanOp->info;
    STableScanBase*  pScanBaseInfo = &pScanInfo->base;
1152
    STableListInfo*  pTableListInfo = pScanBaseInfo->pTableListInfo;
1153

1154
    if (pOffset->type == TMQ_OFFSET__LOG) {
1155
      // todo refactor: move away
1156
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
1157 1158
      pScanBaseInfo->dataReader = NULL;

H
Haojun Liao 已提交
1159 1160 1161
      SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
      SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
      walReaderVerifyOffset(pWalReader, pOffset);
1162 1163
      if (pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id) < 0) {
        qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
H
Haojun Liao 已提交
1164 1165
        return -1;
      }
1166
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
1167 1168
      // iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
      // those data are from the snapshot in tsdb, besides the data in the wal file.
1169 1170
      int64_t uid = pOffset->uid;
      int64_t ts = pOffset->ts;
1171
      int32_t index = 0;
1172

1173 1174 1175 1176
      // this value may be changed if new tables are created
      taosRLockLatch(&pTaskInfo->lock);
      int32_t numOfTables = tableListGetSize(pTableListInfo);

1177
      if (uid == 0) {
1178 1179
        if (numOfTables != 0) {
          STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
1180 1181
          uid = pTableInfo->uid;
          ts = INT64_MIN;
1182
          pScanInfo->currentTable = 0;
1183
        } else {
1184 1185
          taosRUnLockLatch(&pTaskInfo->lock);
          qError("no table in table list, %s", id);
1186
          terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
L
Liu Jicong 已提交
1187 1188
          return -1;
        }
1189
      }
H
Haojun Liao 已提交
1190

X
Xiaoyu Wang 已提交
1191 1192
      qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
             pInfo->pTableScanOp->resultInfo.totalRows);
1193
      pInfo->pTableScanOp->resultInfo.totalRows = 0;
H
Haojun Liao 已提交
1194

1195
      // start from current accessed position
H
Haojun Liao 已提交
1196 1197 1198
      // we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
      // position, let's find it from the beginning.
      index = tableListFind(pTableListInfo, uid, 0);
1199
      taosRUnLockLatch(&pTaskInfo->lock);
1200

1201 1202 1203
      if (index >= 0) {
        pScanInfo->currentTable = index;
      } else {
H
Haojun Liao 已提交
1204 1205
        qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
               numOfTables, pScanInfo->currentTable, id);
1206
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
wmmhello's avatar
wmmhello 已提交
1207 1208
        return -1;
      }
1209

1210
      STableKeyInfo keyInfo = {.uid = uid};
X
Xiaoyu Wang 已提交
1211
      int64_t       oldSkey = pScanBaseInfo->cond.twindows.skey;
1212 1213 1214

      // let's start from the next ts that returned to consumer.
      pScanBaseInfo->cond.twindows.skey = ts + 1;
H
Haojun Liao 已提交
1215
      pScanInfo->scanTimes = 0;
1216

1217
      if (pScanBaseInfo->dataReader == NULL) {
1218
        int32_t code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
1219
                                      pScanInfo->pResBlock, (void**) &pScanBaseInfo->dataReader, id, false, NULL);
1220 1221 1222
        if (code != TSDB_CODE_SUCCESS) {
          qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
          terrno = code;
wmmhello's avatar
wmmhello 已提交
1223
          return -1;
L
Liu Jicong 已提交
1224
        }
1225

H
Haojun Liao 已提交
1226 1227
        qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1228
      } else {
1229 1230
        pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
        pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
1231
        qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 "  table index:%d numOfTable:%d, %s",
H
Haojun Liao 已提交
1232
               uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
1233
      }
1234 1235 1236

      // restore the key value
      pScanBaseInfo->cond.twindows.skey = oldSkey;
1237
    } else {
1238
      qError("invalid pOffset->type:%d, %s", pOffset->type, id);
1239
      terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1240 1241 1242
      return -1;
    }

1243
  } else {  // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
1244

1245 1246 1247
    if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
      SStreamRawScanInfo* pInfo = pOperator->info;
      SSnapContext*       sContext = pInfo->sContext;
1248

X
Xiaoyu Wang 已提交
1249
      SOperatorInfo*  p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id);
1250 1251
      STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;

1252
      if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
X
Xiaoyu Wang 已提交
1253
        qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
1254
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1255 1256
        return -1;
      }
H
Haojun Liao 已提交
1257

H
Haojun Liao 已提交
1258
      SMetaTableInfo mtInfo = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext);
1259
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
1260
      pInfo->dataReader = NULL;
H
Haojun Liao 已提交
1261

1262 1263
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
      tableListClear(pTableListInfo);
1264

1265
      if (mtInfo.uid == 0) {
1266
        goto end;  // no data
1267
      }
1268

1269 1270
      initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
      pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
H
Haojun Liao 已提交
1271

1272
      tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
1273

1274 1275
      STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
      int32_t        size = tableListGetSize(pTableListInfo);
1276

1277
      pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, (void**) &pInfo->dataReader, NULL,
1278
                     false, NULL);
L
Liu Jicong 已提交
1279

1280 1281
      cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
      strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
1282
      tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
1283 1284
      pTaskInfo->streamInfo.schema = mtInfo.schema;

X
Xiaoyu Wang 已提交
1285
      qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
1286 1287 1288
    } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
      SStreamRawScanInfo* pInfo = pOperator->info;
      SSnapContext*       sContext = pInfo->sContext;
1289
      if (pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
1290
        qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
1291
        terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
1292 1293
        return -1;
      }
X
Xiaoyu Wang 已提交
1294 1295
      qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
             id);
1296 1297
    } else if (pOffset->type == TMQ_OFFSET__LOG) {
      SStreamRawScanInfo* pInfo = pOperator->info;
1298
      pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
1299
      pInfo->dataReader = NULL;
1300
      qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
wmmhello's avatar
wmmhello 已提交
1301
    }
1302
  }
1303 1304

end:
wmmhello's avatar
wmmhello 已提交
1305
  pTaskInfo->streamInfo.currentOffset = *pOffset;
1306

1307 1308
  return 0;
}
H
Haojun Liao 已提交
1309 1310 1311

void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
X
Xiaoyu Wang 已提交
1312
  if (pMsg->info.ahandle == NULL) {
wmmhello's avatar
wmmhello 已提交
1313 1314 1315
    qError("pMsg->info.ahandle is NULL");
    return;
  }
H
Haojun Liao 已提交
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331

  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
  }

  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
  rpcFreeCont(pMsg->pCont);
  destroySendMsgInfo(pSendInfo);
L
Liu Jicong 已提交
1332
}
L
Liu Jicong 已提交
1333

1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
  SExecTaskInfo* pTaskInfo = tinfo;
  SArray* plist = getTableListInfo(pTaskInfo);

  // only extract table in the first elements
  STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);

  SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));

  int32_t numOfTables = tableListGetSize(pTableListInfo);
  for(int32_t i = 0; i < numOfTables; ++i) {
    STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
    taosArrayPush(pUidList, &pKeyInfo->uid);
  }

  taosArrayDestroy(plist);
  return pUidList;
}
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372

static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
  if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
    SStreamScanInfo* pScanInfo = pOperator->info;
    STableScanInfo*  pTableScanInfo = pScanInfo->pTableScanOp->info;
    taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
  } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    STableScanInfo* pScanInfo = pOperator->info;
    taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
  } else {
    if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
      extractTableList(pList, pOperator->pDownstream[0]);
    }
  }
}

SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
  SArray*        pArray = taosArrayInit(0, POINTER_BYTES);
  SOperatorInfo* pOperator = pTaskInfo->pRoot;
  extractTableList(pArray, pOperator);
  return pArray;
L
liuyao 已提交
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
}

int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
  pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
  return 0;
}

int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
  SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tInfo;
  pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
  return 0;
}