You need to sign in or sign up before continuing.
scanoperator.c 108.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
33
#include "vnode.h"
H
Haojun Liao 已提交
34 35

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
36
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
37

H
Haojun Liao 已提交
38 39 40 41 42 43 44 45 46 47 48 49
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
50
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
51

H
Haojun Liao 已提交
52
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
53 54 55 56 57 58 59 60 61 62 63 64
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

65
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
66 67 68 69 70
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

71 72 73 74 75 76 77 78 79
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
80
  // convert key to second
81 82 83 84 85 86 87
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
88
  time_t    t = (time_t)key;
89 90 91 92 93
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
94
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
95 96 97 98

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
99
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
100 101 102 103

  tw->ekey -= 1;
}

104
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
105 106 107 108 109 110 111
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

112
  if (order == TSDB_ORDER_ASC) {
113
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
114 115
    assert(w.ekey >= pBlockInfo->window.skey);

S
slzhou 已提交
116
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
117 118 119
      return true;
    }

120 121
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
122 123 124 125 126
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

      assert(w.ekey > pBlockInfo->window.ekey);
127
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
128 129 130 131
        return true;
      }
    }
  } else {
132
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
133 134
    assert(w.skey <= pBlockInfo->window.ekey);

135
    if (TMAX(w.skey, pBlockInfo->window.skey) <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
136 137 138
      return true;
    }

139
    while (1) {
140 141 142 143 144 145
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
146
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
147 148 149
        return true;
      }
    }
150 151 152 153 154
  }

  return false;
}

155 156 157 158 159 160 161 162 163 164 165
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
166
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable, buf,
167
                                                               GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
168 169 170 171 172

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
173
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
174 175 176 177 178 179
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
180
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
181 182 183
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
184
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
185 186 187 188 189 190 191 192 193 194 195 196

  SFilePage*  pPage = NULL;
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage);

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
197
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
198 199 200 201 202 203 204 205 206

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
207
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
208 209 210 211 212 213 214 215

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
216
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
217
                               int32_t numOfRows) {
H
Haojun Liao 已提交
218
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
219 220 221
    return true;
  }

H
Haojun Liao 已提交
222
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
223 224 225
  return keep;
}

H
Haojun Liao 已提交
226
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
227 228 229 230 231
  bool             allColumnsHaveAgg = true;
  SColumnDataAgg** pColAgg = NULL;

  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
  if (code != TSDB_CODE_SUCCESS) {
232
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245
  }

  if (!allColumnsHaveAgg) {
    return false;
  }

  //  if (allColumnsHaveAgg == true) {
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);

  // todo create this buffer during creating operator
  if (pBlock->pBlockAgg == NULL) {
    pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
    if (pBlock->pBlockAgg == NULL) {
246
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
H
Haojun Liao 已提交
247 248 249
    }
  }

250 251
  size_t num = taosArrayGetSize(pTableScanInfo->matchInfo.pList);
  for (int32_t i = 0; i < num; ++i) {
H
Haojun Liao 已提交
252 253
    SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
H
Haojun Liao 已提交
254 255
      continue;
    }
H
Haojun Liao 已提交
256 257

    pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
H
Haojun Liao 已提交
258 259 260 261 262
  }

  return true;
}

H
Haojun Liao 已提交
263
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
264
                               int32_t rows) {
H
Haojun Liao 已提交
265 266 267
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

268
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
269
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
270
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
271
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
272 273
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
274 275 276

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
277 278 279
  }
}

280 281
// todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
282
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
283
  const char* id = GET_TASKID(pTaskInfo);
284 285 286 287 288

  if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
      pBlock->info.rows = 0;
H
Haojun Liao 已提交
289
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
290 291 292 293 294 295 296 297 298 299 300 301
    } else {
      blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
    int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
    int32_t keep = pBlock->info.rows - overflowRows;

    blockDataKeepFirstNRows(pBlock, keep);
H
Haojun Liao 已提交
302
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
303 304 305 306
    pOperator->status = OP_EXEC_DONE;
  }
}

H
Haojun Liao 已提交
307
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
308
                             uint32_t* status) {
309
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
310
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
311 312

  pCost->totalBlocks += 1;
313
  pCost->totalRows += pBlock->info.rows;
314

H
Haojun Liao 已提交
315
  bool loadSMA = false;
H
Haojun Liao 已提交
316
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
317
  if (pOperator->exprSupp.pFilterInfo != NULL ||
318
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
319 320 321 322
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
323
  taosMemoryFreeClear(pBlock->pBlockAgg);
324 325

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
326 327
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
328
    pCost->filterOutBlocks += 1;
329
    pCost->totalRows += pBlock->info.rows;
330 331
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
332 333
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
334
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
335 336
    pCost->skipBlocks += 1;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
337
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
338
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
339
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
340
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
341
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
342 343
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
344
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
345 346
      return TSDB_CODE_SUCCESS;
    } else {
347
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
348
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
349
    }
H
Haojun Liao 已提交
350
  }
351

H
Haojun Liao 已提交
352
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
353

H
Haojun Liao 已提交
354
  // try to filter data block according to sma info
H
Haojun Liao 已提交
355
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
356 357 358
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
359
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
360 361 362 363 364 365 366 367
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

        return TSDB_CODE_SUCCESS;
      }
368
    }
H
Haojun Liao 已提交
369
  }
370

371 372 373
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

374
  // try to filter data block according to current results
375 376
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
377
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
378 379 380
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

381
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
382 383 384
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
385 386
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
387

H
Haojun Liao 已提交
388 389 390
  SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (pCols == NULL) {
    return terrno;
H
Haojun Liao 已提交
391 392
  }

H
Haojun Liao 已提交
393
  relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
394
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
395

H
Haojun Liao 已提交
396 397
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
398

H
Haojun Liao 已提交
399
  if (pOperator->exprSupp.pFilterInfo != NULL) {
400
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
401
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
402

403 404
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
405

406 407 408 409 410 411 412
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
413 414
  }

H
Haojun Liao 已提交
415
  applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
416

H
Haojun Liao 已提交
417
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
418
  pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
H
Haojun Liao 已提交
419 420 421
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
422
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
423 424 425
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
426
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
427 428
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
429 430
}

431 432
typedef struct STableCachedVal {
  const char* pName;
433
  STag*       pTags;
434 435
} STableCachedVal;

436 437 438 439 440 441 442 443 444 445 446
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
447 448 449 450 451 452 453
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
  pVal->pName = strdup(pMetaReader->me.name);
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
454
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
455 456 457 458 459 460 461
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

462 463
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
464

465 466
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
467
  // currently only the tbname pseudo column
468
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
469
    return TSDB_CODE_SUCCESS;
470 471
  }

472 473
  int32_t code = 0;

474 475 476 477
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

478
  bool            freeReader = false;
479
  STableCachedVal val = {0};
480 481

  SMetaReader mr = {0};
482
  LRUHandle*  h = NULL;
483

484
  // 1. check if it is existed in meta cache
485
  if (pCache == NULL) {
486
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
487
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid);
488
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
489
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
490 491
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid,
              tstrerror(terrno), idStr);
H
Haojun Liao 已提交
492 493 494
      } else {
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
      }
495 496 497 498 499
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
500

501 502
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
503 504

    freeReader = true;
505
  } else {
506 507
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
508
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid));
509 510
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
511
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.uid);
512
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
513
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
514 515
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
                pBlock->info.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
516
        } else {
517 518
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno),
                 idStr);
H
Haojun Liao 已提交
519
        }
520 521 522 523 524 525
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
526
      STableCachedVal* pVal = createTableCacheVal(&mr);
527

H
Haojun Liao 已提交
528
      val = *pVal;
529
      freeReader = true;
H
Haojun Liao 已提交
530

531 532
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal,
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
533 534 535 536 537 538 539 540
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
541

H
Haojun Liao 已提交
542
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
543
    }
H
Haojun Liao 已提交
544

545 546
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
547
  }
548

549 550
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
551
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
552 553

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
554
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
555

556
    int32_t functionId = pExpr1->pExpr->_function.functionId;
557 558 559

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
560
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
561
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
562
      STagVal tagVal = {0};
563 564
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
565

566 567 568 569
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
570
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
571
      }
572

H
Haojun Liao 已提交
573 574 575
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
        colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
576
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
H
Haojun Liao 已提交
577
        colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
H
Haojun Liao 已提交
578 579 580
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
L
Liu Jicong 已提交
581
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
582
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
H
Haojun Liao 已提交
583
          colDataAppend(pColInfoData, i, data, false);
H
Haojun Liao 已提交
584
        }
585 586 587 588
      }
    }
  }

589 590
  // restore the rows
  pBlock->info.rows = backupRows;
591 592 593 594
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
595
  return TSDB_CODE_SUCCESS;
596 597
}

H
Haojun Liao 已提交
598
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
599 600 601
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
602
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
603
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
604 605 606
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
607

H
Haojun Liao 已提交
608 609
  colInfoDataEnsureCapacity(&infoData, 1, false);
  colDataAppend(&infoData, 0, buf, false);
610

H
Haojun Liao 已提交
611
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
612
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
613 614 615 616 617 618 619

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
620
  colDataDestroy(&infoData);
621 622
}

623
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
624
  STableScanInfo* pTableScanInfo = pOperator->info;
625
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
626
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
627

628 629
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
630
  while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
631
    if (isTaskKilled(pTaskInfo)) {
632
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
633
    }
H
Haojun Liao 已提交
634

635 636 637 638 639 640
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

641
    blockDataCleanup(pBlock);
H
Haojun Liao 已提交
642
    SDataBlockInfo* pBInfo = &pBlock->info;
H
Haojun Liao 已提交
643 644

    int32_t rows = 0;
H
Haojun Liao 已提交
645
    tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->uid, &pBInfo->window);
H
Haojun Liao 已提交
646

647
    blockDataEnsureCapacity(pBlock, rows);  // todo remove it latter
H
Haojun Liao 已提交
648
    pBInfo->rows = rows;
649

H
Haojun Liao 已提交
650
    ASSERT(pBInfo->uid != 0);
H
Haojun Liao 已提交
651
    pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
652

653
    uint32_t status = 0;
H
Haojun Liao 已提交
654
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
655 656
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
657
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
658
    }
659

660 661 662
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
663
    }
664

H
Haojun Liao 已提交
665 666
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
667

H
Haojun Liao 已提交
668
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
669 670

    // todo refactor
L
Liu Jicong 已提交
671 672 673 674 675
    /*pTableScanInfo->lastStatus.uid = pBlock->info.uid;*/
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
676

L
Liu Jicong 已提交
677
    ASSERT(pBlock->info.uid != 0);
678
    return pBlock;
H
Haojun Liao 已提交
679 680 681 682
  }
  return NULL;
}

H
Haojun Liao 已提交
683
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
684 685 686 687
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
688
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
689 690 691
    return NULL;
  }

692 693
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
694 695 696
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
697 698
    }

699
    pTableScanInfo->scanTimes += 1;
700

701
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
702
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
703
      pTableScanInfo->base.scanFlag = REPEAT_SCAN;
704
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
705

706
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
707
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
708
    }
709
  }
H
Haojun Liao 已提交
710

711
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
712
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
713 714 715
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
716
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
717
    }
H
Haojun Liao 已提交
718

719
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
720 721 722
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
723
      }
H
Haojun Liao 已提交
724

725
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
726

727
      if (pTableScanInfo->scanTimes < total) {
728
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
729
        pTableScanInfo->base.scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
730

731
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
732
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
733
      }
H
Haojun Liao 已提交
734 735 736
    }
  }

wmmhello's avatar
wmmhello 已提交
737 738 739 740 741 742 743
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

744
  // scan table one by one sequentially
L
Liu Jicong 已提交
745
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
746
    int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
747

L
Liu Jicong 已提交
748
    while (1) {
H
Haojun Liao 已提交
749
      SSDataBlock* result = doGroupedTableScan(pOperator);
L
Liu Jicong 已提交
750 751 752
      if (result) {
        return result;
      }
H
Haojun Liao 已提交
753

L
Liu Jicong 已提交
754 755
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
756
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
757 758
        return NULL;
      }
H
Haojun Liao 已提交
759

H
Haojun Liao 已提交
760
      STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
H
Haojun Liao 已提交
761
      tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
L
Liu Jicong 已提交
762 763
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);
H
Haojun Liao 已提交
764

H
Haojun Liao 已提交
765
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
766 767
      pInfo->scanTimes = 0;
    }
768 769
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
H
Haojun Liao 已提交
770
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
771
        setOperatorCompleted(pOperator);
772 773
        return NULL;
      }
774

5
54liuyao 已提交
775
      int32_t        num = 0;
776
      STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
777
      tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
778
      ASSERT(pInfo->base.dataReader == NULL);
779

H
Haojun Liao 已提交
780 781
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
782 783 784
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
wmmhello's avatar
wmmhello 已提交
785
    }
H
Haojun Liao 已提交
786

H
Haojun Liao 已提交
787
    SSDataBlock* result = doGroupedTableScan(pOperator);
788
    if (result != NULL) {
H
Haojun Liao 已提交
789
      ASSERT(result->info.uid != 0);
790 791
      return result;
    }
H
Haojun Liao 已提交
792

H
Haojun Liao 已提交
793
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
794
      setOperatorCompleted(pOperator);
795 796
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
797

798 799
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
H
Haojun Liao 已提交
800 801
    pInfo->base.limitInfo.numOfOutputRows = 0;
    pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset;
wmmhello's avatar
wmmhello 已提交
802

5
54liuyao 已提交
803
    int32_t        num = 0;
804
    STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
805
    tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
806

H
Haojun Liao 已提交
807 808
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
809
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
810

H
Haojun Liao 已提交
811
    result = doGroupedTableScan(pOperator);
812 813 814
    if (result != NULL) {
      return result;
    }
815

H
Haojun Liao 已提交
816
    setOperatorCompleted(pOperator);
817 818
    return NULL;
  }
H
Haojun Liao 已提交
819 820
}

821 822
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
823
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
824
  *pRecorder = pTableScanInfo->base.readRecorder;
825 826 827 828 829
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

830
static void destroyTableScanOperatorInfo(void* param) {
831
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
832
  blockDataDestroy(pTableScanInfo->pResBlock);
H
Haojun Liao 已提交
833
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
H
Haojun Liao 已提交
834

H
Haojun Liao 已提交
835 836
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
837

H
Haojun Liao 已提交
838 839
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
840
  }
L
Liu Jicong 已提交
841

H
Haojun Liao 已提交
842 843
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
D
dapan1121 已提交
844
  taosMemoryFreeClear(param);
845 846
}

847
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
848
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
849 850 851
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
852
    goto _error;
H
Haojun Liao 已提交
853 854
  }

855
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
856
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
857 858

  int32_t numOfCols = 0;
859
  int32_t code =
H
Haojun Liao 已提交
860
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
861 862 863 864
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
865
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
866
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
867
  if (code != TSDB_CODE_SUCCESS) {
868
    goto _error;
869 870
  }

H
Haojun Liao 已提交
871
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
872
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
873
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
874
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
875 876
  }

877
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
H
Haojun Liao 已提交
878 879

  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
880 881
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
882 883
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
884

H
Haojun Liao 已提交
885
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
H
Haojun Liao 已提交
886 887

  initResultSizeInfo(&pOperator->resultInfo, 4096);
888
  pInfo->pResBlock = createResDataBlock(pDescNode);
H
Haojun Liao 已提交
889
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
890

H
Haojun Liao 已提交
891 892 893
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
894 895
  }

wmmhello's avatar
wmmhello 已提交
896
  pInfo->currentGroupId = -1;
897
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
898

L
Liu Jicong 已提交
899 900
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
901
  pOperator->exprSupp.numOfExprs = numOfCols;
902

H
Haojun Liao 已提交
903 904
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
905 906 907
    code = terrno;
    goto _error;
  }
908

H
Haojun Liao 已提交
909
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
H
Haojun Liao 已提交
910
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
911
                                         getTableScannerExecInfo);
912 913 914

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
915
  return pOperator;
916

917
_error:
918 919 920
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
921

922 923
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
924
  return NULL;
H
Haojun Liao 已提交
925 926
}

927
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
928
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
929
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
930

H
Haojun Liao 已提交
931
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
932
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
933

L
Liu Jicong 已提交
934 935
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
H
Haojun Liao 已提交
936
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
H
Haojun Liao 已提交
937 938 939
  return pOperator;
}

940
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
L
Liu Jicong 已提交
941 942
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
943 944
}

945
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
946
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
947 948
}

949
static bool isStateWindow(SStreamScanInfo* pInfo) {
950
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
951
}
5
54liuyao 已提交
952

L
Liu Jicong 已提交
953
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
954 955 956
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
957 958 959
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
960
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
961 962
}

963 964 965 966
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

967
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
968 969
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
970
  ASSERT(rowIndex < pBlock->info.rows);
971
  pInfo->groupId = groupCol[rowIndex];
972 973
}

L
Liu Jicong 已提交
974
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
975
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
976 977
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
978 979
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
980 981
}

L
Liu Jicong 已提交
982 983
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
984
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
985

986
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
987
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
988 989 990 991 992 993 994 995 996 997 998

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  blockDataCleanup(pBlock);

  STsdbReader* pReader = NULL;
H
Haojun Liao 已提交
999
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
5
54liuyao 已提交
1000
                                     GET_TASKID(pTaskInfo));
1001 1002
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1003
    T_LONG_JMP(pTaskInfo->env, code);
1004 1005 1006 1007 1008
    return NULL;
  }

  bool hasBlock = tsdbNextDataBlock(pReader);
  if (hasBlock) {
H
Haojun Liao 已提交
1009
    SDataBlockInfo* pBInfo = &pBlock->info;
1010

H
Haojun Liao 已提交
1011 1012
    int32_t rows = 0;
    tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->uid, &pBInfo->window);
1013

H
Haojun Liao 已提交
1014
    SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1015 1016
    blockDataEnsureCapacity(pBlock, rows);
    pBlock->info.rows = rows;
1017

H
Haojun Liao 已提交
1018 1019
    relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true);
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows);
H
Haojun Liao 已提交
1020

H
Haojun Liao 已提交
1021
    pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid);
1022 1023 1024 1025
  }

  tsdbReaderClose(pReader);
  qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1026 1027
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1028 1029

  return pBlock->info.rows > 0 ? pBlock : NULL;
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1041
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
H
Haojun Liao 已提交
1042
  return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
1043 1044
}

5
54liuyao 已提交
1045 1046 1047 1048 1049 1050 1051 1052
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1064 1065 1066
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1067 1068 1069 1070 1071 1072

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1073
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1074 1075 1076 1077
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1078 1079 1080
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1081
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1082 1083 1084
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1085
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1086 1087 1088
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1089 1090
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1091 1092 1093 1094
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1095
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1096 1097 1098
  return true;
}

5
54liuyao 已提交
1099
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1100
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1101
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1102
  dumyInfo.cur.pageId = -1;
1103
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1104 1105
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1106
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1107

5
54liuyao 已提交
1108
  while (1) {
1109 1110 1111
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1112 1113 1114 1115 1116 1117
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] < endWin.ekey)) {
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1118
    }
5
54liuyao 已提交
1119

5
54liuyao 已提交
1120 1121 1122
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1123
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1124
    endWin = preWin;
5
54liuyao 已提交
1125
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1126 1127 1128 1129 1130 1131
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1132

L
Liu Jicong 已提交
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1144
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1145
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1146 1147
      tsdbReaderClose(pTableScanInfo->base.dataReader);
      pTableScanInfo->base.dataReader = NULL;
1148 1149
      return NULL;
    }
L
Liu Jicong 已提交
1150

H
Haojun Liao 已提交
1151
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1152 1153 1154 1155
    if (pResult->info.rows == 0) {
      continue;
    }

1156 1157 1158 1159 1160 1161 1162 1163
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1164 1165
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1166 1167 1168 1169 1170
            colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1171 1172 1173

      blockDataDestroy(tmpBlock);

1174 1175 1176 1177 1178
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
    } else if (pResult->info.groupId == pInfo->groupId) {
5
54liuyao 已提交
1179
      pResult->info.calWin = pInfo->updateWin;
1180
      return pResult;
5
54liuyao 已提交
1181 1182
    }
  }
1183
}
1184

1185 1186 1187
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1188
  }
1189 1190
  blockDataCleanup(pDestBlock);
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1191
  if (code != TSDB_CODE_SUCCESS) {
1192
    return code;
L
Liu Jicong 已提交
1193
  }
1194 1195
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1196
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1197
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1198
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1199 1200
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1201

1202 1203
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1204
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1205
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1206 1207
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1208
  int64_t          version = pSrcBlock->info.version - 1;
1209
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1210
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1211
    // gap must be 0.
5
54liuyao 已提交
1212
    SSessionKey startWin = {0};
1213
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1214
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1215 1216 1217
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1218 1219 1220 1221 1222 1223
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
    colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);

5
54liuyao 已提交
1224
    colDataAppendNULL(pDestUidCol, i);
L
Liu Jicong 已提交
1225
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1226 1227
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
1228
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1229
  }
1230
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1231
}
1232 1233 1234 1235 1236 1237

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1238
  }
5
54liuyao 已提交
1239
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1240 1241 1242 1243
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1244 1245
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1246 1247 1248 1249
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
1250 1251 1252
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*           srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*           srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
1253 1254
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1255
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1256 1257 1258
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1259
  int64_t          version = pSrcBlock->info.version - 1;
1260
  for (int32_t i = 0; i < rows;) {
1261
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1262 1263 1264 1265 1266
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1267
    colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1268
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1269 1270
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1271 1272
    colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
1273 1274 1275 1276
    colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
    pDestBlock->info.rows++;
5
54liuyao 已提交
1277
  }
1278 1279
  return TSDB_CODE_SUCCESS;
}
1280

1281
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1282 1283 1284
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1285 1286
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1287
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1288 1289 1290 1291
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1302
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1303 1304
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1305
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1306 1307 1308
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1309
    if (pInfo->tbnameCalSup.pExprInfo) {
1310 1311 1312
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1313 1314 1315 1316 1317
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1318 1319 1320 1321
  }
  return TSDB_CODE_SUCCESS;
}

1322 1323 1324 1325
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1326
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1327
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1328 1329
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1330
  }
1331
  pDestBlock->info.type = STREAM_CLEAR;
1332
  pDestBlock->info.version = pSrcBlock->info.version;
1333 1334 1335 1336
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1337 1338 1339 1340 1341 1342 1343 1344 1345
static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) {
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  blockDataEnsureCapacity(pResBlock, 1);

H
Haojun Liao 已提交
1346
  projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
L
Liu Jicong 已提交
1347 1348 1349
  ASSERT(pResBlock->info.rows == 1);

  // build tagArray
1350 1351 1352 1353 1354
  /*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
  /*STagVal tagVal = {*/
  /*.cid = 0,*/
  /*.type = 0,*/
  /*};*/
L
Liu Jicong 已提交
1355 1356 1357 1358 1359 1360
  // build STag
  // set STag

  blockDataDestroy(pSrcBlock);
}

L
Liu Jicong 已提交
1361
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1362 1363
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1364 1365
  if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1366 1367 1368 1369 1370 1371

  void* tbname = NULL;
  if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1372
  }
1373
  tdbFree(tbname);
L
Liu Jicong 已提交
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  SSDataBlock* pResBlock = createDataBlock();
  pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
  SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
  taosArrayPush(pResBlock->pDataBlock, &data);
  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);
  ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
  SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
  ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);

  void* pData = colDataGetData(pCol, 0);
  // TODO check tbname validation
  if (pData != (void*)-1 && pData != NULL) {
1393
    memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1394
    int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
1395 1396
    memcpy(pBlock->info.parTbName, varDataVal(pData), len);
    /*pBlock->info.parTbName[len + 1] = 0;*/
L
Liu Jicong 已提交
1397 1398 1399 1400
  } else {
    pBlock->info.parTbName[0] = 0;
  }

1401 1402
  if (pBlock->info.groupId && pBlock->info.parTbName[0]) {
    streamStatePutParName(pState, pBlock->info.groupId, pBlock->info.parTbName);
L
Liu Jicong 已提交
1403 1404
  }

L
Liu Jicong 已提交
1405 1406 1407 1408
  blockDataDestroy(pSrcBlock);
  blockDataDestroy(pResBlock);
}

1409 1410
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1411 1412
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1413 1414
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1415 1416
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1417
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1418 1419
  colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
1420 1421
  colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
1422 1423
  colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
1424
  colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1425
  pBlock->info.rows++;
5
54liuyao 已提交
1426 1427
}

1428
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1429 1430
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1431
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1432
  }
1433 1434
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1435
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
L
Liu Jicong 已提交
1436
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid);
1437
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1438 1439
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1440
    bool        isClosed = false;
5
54liuyao 已提交
1441
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
L
Liu Jicong 已提交
1442
    if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
5
54liuyao 已提交
1443 1444 1445
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1446 1447
    // must check update info first.
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1448
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1449 1450
                     isDeletedStreamWindow(&win, pBlock->info.groupId,
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1451
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1452
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1453
      uint64_t gpId = 0;
1454 1455
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
                                       NULL);
5
54liuyao 已提交
1456 1457
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
1458 1459
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid, &gpId,
                                         NULL);
5
54liuyao 已提交
1460
      }
1461 1462
    }
  }
1463 1464
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1465
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1466
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1467 1468
  }
}
L
Liu Jicong 已提交
1469

1470
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1471 1472
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1473
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1474

1475 1476
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1477 1478 1479
  pInfo->pRes->info.rows = pBlock->info.rows;
  pInfo->pRes->info.uid = pBlock->info.uid;
  pInfo->pRes->info.type = STREAM_NORMAL;
1480
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1481

H
Haojun Liao 已提交
1482
  pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
L
Liu Jicong 已提交
1483 1484

  // todo extract method
H
Haojun Liao 已提交
1485 1486 1487
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1488 1489 1490 1491 1492 1493 1494
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1495
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1496
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1497 1498 1499 1500 1501 1502 1503
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1504
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
L
Liu Jicong 已提交
1505 1506 1507 1508 1509 1510
      colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1511
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1512
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1513 1514
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1515
      blockDataFreeRes((SSDataBlock*)pBlock);
1516
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1517
    }
K
kailixu 已提交
1518 1519 1520

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1521 1522
  }

1523
  if (filter) {
H
Haojun Liao 已提交
1524
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1525
  }
1526

L
Liu Jicong 已提交
1527
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1528
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1529

L
Liu Jicong 已提交
1530
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1531 1532
  return 0;
}
5
54liuyao 已提交
1533

L
Liu Jicong 已提交
1534
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1535 1536
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1537

L
Liu Jicong 已提交
1538
  qDebug("queue scan called");
L
Liu Jicong 已提交
1539 1540 1541 1542 1543 1544 1545 1546

  if (pTaskInfo->streamInfo.pReq != NULL) {
    if (pInfo->tqReader->pMsg == NULL) {
      pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
      const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
      if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
        qError("submit msg messed up when initing stream submit block %p", pSubmit);
        pInfo->tqReader->pMsg = NULL;
L
Liu Jicong 已提交
1547
        pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

    while (tqNextDataBlock(pInfo->tqReader)) {
      SSDataBlock block = {0};

      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1564
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1565 1566 1567 1568 1569 1570 1571 1572

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

    pInfo->tqReader->pMsg = NULL;
    pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1573
    return NULL;
L
Liu Jicong 已提交
1574 1575
  }

L
Liu Jicong 已提交
1576 1577 1578
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
L
Liu Jicong 已提交
1579
      qDebug("queue scan tsdb return %d rows", pResult->info.rows);
1580
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1581 1582
      return pResult;
    } else {
1583 1584
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1585 1586
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1587
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1588
        qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1589
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
1590
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1591 1592 1593 1594
          return NULL;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
      } else {
L
Liu Jicong 已提交
1595 1596
        return NULL;
      }
1597 1598 1599
    }
  }

L
Liu Jicong 已提交
1600 1601 1602 1603 1604 1605
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
      tqNextBlock(pInfo->tqReader, &ret);
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1606
        if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
L
Liu Jicong 已提交
1607 1608 1609
          ASSERT(0);
        }
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1610
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1611
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1612 1613 1614 1615
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
        ASSERT(0);
L
Liu Jicong 已提交
1616 1617 1618
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1619 1620
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1621
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1622 1623 1624 1625
        ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
        ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1626
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1627
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1628 1629 1630
        return NULL;
      }
    }
L
Liu Jicong 已提交
1631
#if 0
1632
    } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1633
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
L
Liu Jicong 已提交
1634 1635 1636 1637 1638 1639
    if (pResult && pResult->info.rows > 0) {
      qDebug("stream scan tsdb return %d rows", pResult->info.rows);
      return pResult;
    }
    qDebug("stream scan tsdb return null");
    return NULL;
L
Liu Jicong 已提交
1640
#endif
L
Liu Jicong 已提交
1641 1642 1643
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1644
  }
L
Liu Jicong 已提交
1645 1646
}

L
Liu Jicong 已提交
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
      colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);

      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
      j++;
    }
  }
L
Liu Jicong 已提交
1675
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1676 1677
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1678
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1679 1680 1681 1682

  return 0;
}

5
54liuyao 已提交
1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
      colDataAppend(pGpCol, i, (const char*)&groupId, false);
    }
  } else {
    // SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
    // if (!pPreRes || pPreRes->info.rows == 0) {
    //   return 0;
    // }
    // ASSERT(pPreRes->info.rows == 1);
    // return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
  }
}

L
Liu Jicong 已提交
1707 1708 1709 1710 1711
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1712
  qDebug("stream scan called");
L
Liu Jicong 已提交
1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745
#if 0
  SStreamState* pState = pTaskInfo->streamInfo.pState;
  if (pState) {
    printf(">>>>>>>> stream write backend\n");
    SWinKey key = {
        .ts = 1,
        .groupId = 2,
    };
    char tmp[100] = "abcdefg1";
    if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
      ASSERT(0);
    }

    key.ts = 2;
    char tmp2[100] = "abcdefg2";
    if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
      ASSERT(0);
    }

    key.groupId = 5;
    key.ts = 1;
    char tmp3[100] = "abcdefg3";
    if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
      ASSERT(0);
    }

    char*   val2 = NULL;
    int32_t sz;
    if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
      ASSERT(0);
    }
    printf("stream read %s %d\n", val2, sz);
    streamFreeVal(val2);
H
Haojun Liao 已提交
1746
  }
L
Liu Jicong 已提交
1747
#endif
H
Haojun Liao 已提交
1748

1749 1750
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1751
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1752
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1753
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1754 1755 1756 1757
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1758
    } else {
H
Haojun Liao 已提交
1759 1760 1761 1762
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1763
    }
L
Liu Jicong 已提交
1764 1765

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1766 1767
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1768

L
Liu Jicong 已提交
1769 1770 1771 1772 1773 1774 1775 1776
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
  }

  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
    SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
    if (pBlock != NULL) {
L
Liu Jicong 已提交
1777
      calBlockTbName(pInfo, pBlock);
1778
      if (pInfo->pUpdateInfo) {
L
Liu Jicong 已提交
1779 1780
        TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
        pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
1781
      }
1782
      qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
L
Liu Jicong 已提交
1783
      printDataBlock(pBlock, "scan recover");
L
Liu Jicong 已提交
1784 1785 1786
      return pBlock;
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1787
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1788 1789
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
1790

H
Haojun Liao 已提交
1791 1792
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1793

L
Liu Jicong 已提交
1794 1795 1796
    return NULL;
  }

5
54liuyao 已提交
1797
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1798
// TODO: refactor
L
Liu Jicong 已提交
1799
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1800
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1801
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1802
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1803
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1804 1805 1806
      return NULL;
    }

1807
    int32_t      current = pInfo->validBlockIndex++;
1808
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
1809 1810 1811
    if (pBlock->info.groupId && pBlock->info.parTbName[0]) {
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, pBlock->info.parTbName);
    }
1812
    // TODO move into scan
5
54liuyao 已提交
1813 1814
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1815
    blockDataUpdateTsWindow(pBlock, 0);
1816
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1817 1818 1819
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1820 1821 1822
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1823 1824
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1825 1826 1827
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1828
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1829
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1830
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1831
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1832
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1833 1834
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1835
        }
5
54liuyao 已提交
1836 1837
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
1838
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1839
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1840
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1841
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1842 1843
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1844 1845 1846 1847 1848
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1849 1850 1851
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1852
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1853 1854 1855
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1856 1857 1858 1859
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1860
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1861
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1862 1863 1864 1865
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1866
        }
1867 1868 1869
      } break;
      default:
        break;
5
54liuyao 已提交
1870
    }
1871
    // printDataBlock(pBlock, "stream scan recv");
1872
    return pBlock;
L
Liu Jicong 已提交
1873
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1874
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1875 1876 1877 1878 1879 1880
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        blockDataDestroy(pInfo->pUpdateRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        return pInfo->pRes;
      } break;
1881
      case STREAM_SCAN_FROM_DELETE_DATA: {
1882 1883 1884 1885 1886 1887 1888
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1889 1890 1891 1892 1893 1894 1895 1896 1897 1898
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1899
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1900 1901
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1902 1903
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1904
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1905
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
1906 1907
          return pSDB;
        }
1908
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1909 1910 1911 1912
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1913
    }
1914

1915
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1916
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1917 1918
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1919 1920
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1921 1922
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1923
    }
5
54liuyao 已提交
1924

H
Haojun Liao 已提交
1925 1926
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1927
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
1928

L
Liu Jicong 已提交
1929
  NEXT_SUBMIT_BLK:
1930 1931 1932
    while (1) {
      if (pInfo->tqReader->pMsg == NULL) {
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
1933
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
1934
          doClearBufferedBlocks(pInfo);
1935 1936
          return NULL;
        }
1937

1938 1939 1940 1941 1942 1943 1944 1945
        int32_t     current = pInfo->validBlockIndex++;
        SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
        if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          pInfo->tqReader->pMsg = NULL;
          continue;
        }
H
Haojun Liao 已提交
1946 1947
      }

1948 1949 1950 1951
      blockDataCleanup(pInfo->pRes);

      while (tqNextDataBlock(pInfo->tqReader)) {
        SSDataBlock block = {0};
1952

1953 1954 1955 1956 1957 1958
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

1959
        setBlockIntoRes(pInfo, &block, false);
1960

L
Liu Jicong 已提交
1961 1962
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId,
                             pInfo->pRes->info.version)) {
1963 1964 1965 1966 1967
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983
        if (pInfo->pUpdateInfo) {
          checkUpdateData(pInfo, true, pInfo->pRes, true);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
          if (pInfo->pUpdateDataRes->info.rows > 0) {
            pInfo->updateResIndex = 0;
            if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
              pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
              pInfo->scanMode = STREAM_SCAN_FROM_RES;
              return pInfo->pUpdateDataRes;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
              pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
            }
          }
        }

H
Haojun Liao 已提交
1984
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1985 1986 1987
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
1988 1989 1990
          break;
        }
      }
1991
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
1992
        break;
J
jiacy-jcy 已提交
1993 1994
      } else {
        pInfo->tqReader->pMsg = NULL;
1995
        continue;
5
54liuyao 已提交
1996
      }
1997
      /*blockDataCleanup(pInfo->pRes);*/
H
Haojun Liao 已提交
1998 1999 2000 2001
    }

    // record the scan action.
    pInfo->numOfExec++;
2002
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2003
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2004

L
Liu Jicong 已提交
2005
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
2006 2007 2008
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2009 2010 2011 2012 2013 2014

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2015 2016 2017
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2018 2019 2020
  }
}

H
Haojun Liao 已提交
2021
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2022 2023 2024
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2025 2026 2027
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2028 2029 2030 2031 2032 2033
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2034
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2035 2036
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2037
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2038
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2039
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2040

wmmhello's avatar
wmmhello 已提交
2041
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2042
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
wmmhello's avatar
wmmhello 已提交
2043
    SSDataBlock* pBlock = &pInfo->pRes;
2044

2045
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
2046 2047 2048
      if (isTaskKilled(pTaskInfo)) {
        longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
      }
2049

H
Haojun Liao 已提交
2050 2051 2052
      int32_t rows = 0;
      tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.uid, &pBlock->info.window);
      pBlock->info.rows = rows;
2053

wmmhello's avatar
wmmhello 已提交
2054 2055 2056
      SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      pBlock->pDataBlock = pCols;
      if (pCols == NULL) {
wmmhello's avatar
wmmhello 已提交
2057
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2058 2059
      }

2060
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.uid);
wmmhello's avatar
wmmhello 已提交
2061 2062 2063 2064 2065
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2066 2067

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2068
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2069 2070
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2071 2072
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2073
    } else {
wmmhello's avatar
wmmhello 已提交
2074 2075
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2076
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2077 2078
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2079
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2080
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2081
    return NULL;
L
Liu Jicong 已提交
2082 2083 2084 2085 2086 2087 2088
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2089
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2090
      taosMemoryFreeClear(data);
2091 2092 2093
      return NULL;
    }

L
Liu Jicong 已提交
2094
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2095 2096 2097 2098
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2099
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2100
    } else {
wmmhello's avatar
wmmhello 已提交
2101 2102 2103 2104 2105 2106 2107
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2108

wmmhello's avatar
wmmhello 已提交
2109
    return NULL;
2110
  }
L
Liu Jicong 已提交
2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2149 2150 2151
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2152
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2153 2154 2155 2156 2157 2158
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2159 2160 2161
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2162
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2163 2164 2165 2166 2167
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2168 2169
  int32_t code = TSDB_CODE_SUCCESS;

2170
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2171
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2172
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2173 2174
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2175 2176
  }

wmmhello's avatar
wmmhello 已提交
2177 2178
  pInfo->vnode = pHandle->vnode;

2179
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2180 2181
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2182

H
Haojun Liao 已提交
2183
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
2184
  return pOperator;
H
Haojun Liao 已提交
2185

L
Liu Jicong 已提交
2186
_end:
H
Haojun Liao 已提交
2187 2188 2189 2190
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2191 2192
}

2193
static void destroyStreamScanOperatorInfo(void* param) {
2194 2195
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2196
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2197 2198 2199 2200
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2201 2202
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2203
  }
C
Cary Xu 已提交
2204 2205
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2206
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2207
  }
C
Cary Xu 已提交
2208

L
Liu Jicong 已提交
2209 2210
  cleanupExprSupp(&pStreamScan->tbnameCalSup);

L
Liu Jicong 已提交
2211
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2212 2213 2214 2215
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2216
  blockDataDestroy(pStreamScan->pUpdateDataRes);
2217 2218 2219 2220
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2221
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2222
                                            SExecTaskInfo* pTaskInfo) {
2223 2224
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2225

H
Haojun Liao 已提交
2226 2227
  if (pInfo == NULL || pOperator == NULL) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
2228
    goto _error;
H
Haojun Liao 已提交
2229 2230
  }

2231
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2232
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2233

2234
  pInfo->pTagCond = pTagCond;
2235
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2236

2237
  int32_t numOfCols = 0;
2238 2239
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2240 2241 2242
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2243

H
Haojun Liao 已提交
2244
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
2245
  SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2246
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2247
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2248 2249

    int16_t colId = id->colId;
2250
    taosArrayPush(pColIds, &colId);
2251
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2252
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2253
    }
H
Haojun Liao 已提交
2254 2255
  }

L
Liu Jicong 已提交
2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

H
Haojun Liao 已提交
2282 2283
  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
2284 2285
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2286 2287
  }

5
54liuyao 已提交
2288
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2289
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2290
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2291
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2292
      pTSInfo->base.cond.endVersion = pHandle->version;
2293
    }
L
Liu Jicong 已提交
2294

2295
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2296
    int32_t        num = 0;
H
Haojun Liao 已提交
2297
    tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
2298

2299
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2300
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2301 2302
      pTSInfo->base.dataReader = NULL;
      code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, &pTSInfo->base.dataReader, NULL);
dengyihao's avatar
dengyihao 已提交
2303 2304
      if (code != 0) {
        terrno = code;
H
Haojun Liao 已提交
2305
        destroyTableScanOperatorInfo(pTableScanOp);
2306
        goto _error;
L
Liu Jicong 已提交
2307
      }
L
Liu Jicong 已提交
2308 2309
    }

L
Liu Jicong 已提交
2310 2311 2312 2313
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2314
    } else {
L
Liu Jicong 已提交
2315 2316
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2317 2318
    }

2319
    pInfo->pUpdateInfo = NULL;
2320
    pInfo->pTableScanOp = pTableScanOp;
2321 2322 2323
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2324

L
Liu Jicong 已提交
2325 2326
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2327
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
L
Liu Jicong 已提交
2328

L
Liu Jicong 已提交
2329
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2330
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
H
Haojun Liao 已提交
2331
    SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
2332
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2333 2334 2335 2336 2337
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2338
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2339 2340
  } else {
    taosArrayDestroy(pColIds);
5
54liuyao 已提交
2341 2342
  }

2343 2344 2345 2346 2347
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2348 2349 2350 2351 2352
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2353
  pInfo->pRes = createResDataBlock(pDescNode);
2354
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2355
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2356
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2357
  pInfo->groupId = 0;
2358
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2359
  pInfo->pStreamScanOp = pOperator;
2360
  pInfo->deleteDataIndex = 0;
2361
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2362
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2363
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2364
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2365
  pInfo->partitionSup.needCalc = false;
L
Liu Jicong 已提交
2366

L
Liu Jicong 已提交
2367 2368
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2369
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2370

L
Liu Jicong 已提交
2371
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
H
Haojun Liao 已提交
2372
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, NULL);
2373

H
Haojun Liao 已提交
2374
  return pOperator;
2375

L
Liu Jicong 已提交
2376
_error:
H
Haojun Liao 已提交
2377 2378 2379 2380 2381 2382 2383 2384
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2385 2386
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2387 2388
}

2389
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2390 2391 2392 2393
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2394 2395 2396
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2397
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2398
  SSDataBlock*  pRes = pInfo->pRes;
2399
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2400

H
Haojun Liao 已提交
2401
  int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
wmmhello's avatar
wmmhello 已提交
2402
  if (size == 0) {
H
Haojun Liao 已提交
2403 2404 2405 2406
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2407 2408 2409
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2410
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2411

wmmhello's avatar
wmmhello 已提交
2412
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
H
Haojun Liao 已提交
2413
    STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
L
Liu Jicong 已提交
2414
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2415
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2416
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2417 2418
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2419
      metaReaderClear(&mr);
2420
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2421
    }
H
Haojun Liao 已提交
2422

2423
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2424 2425 2426 2427 2428 2429
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
2430
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2431 2432
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2433
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2434

2435 2436 2437 2438
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2439 2440
          data = (char*)p;
        }
L
Liu Jicong 已提交
2441 2442
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2443

2444 2445
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2446
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2447
        }
H
Haojun Liao 已提交
2448 2449 2450
      }
    }

2451
    count += 1;
wmmhello's avatar
wmmhello 已提交
2452
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2453
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2454 2455 2456
    }
  }

2457 2458
  metaReaderClear(&mr);

2459
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2460
  if (pOperator->status == OP_EXEC_DONE) {
2461
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2462 2463 2464
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2465
  pOperator->resultInfo.totalRows += count;
2466

2467
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2468 2469
}

2470
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2471 2472
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2473
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2474
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2475 2476
}

2477 2478
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2479
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2480 2481 2482 2483 2484
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2485 2486 2487 2488
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2489
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2490 2491 2492
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2493

H
Haojun Liao 已提交
2494 2495
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2496 2497 2498
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2499

2500 2501 2502
  pInfo->pRes = createResDataBlock(pDescNode);
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2503

L
Liu Jicong 已提交
2504 2505
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2506
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2507 2508
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

H
Haojun Liao 已提交
2509
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, NULL);
H
Haojun Liao 已提交
2510 2511

  return pOperator;
2512

2513
_error:
H
Haojun Liao 已提交
2514 2515 2516 2517 2518
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2519

dengyihao's avatar
dengyihao 已提交
2520
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
  STableMergeScanInfo*            pTableScanInfo = pOperator->info;

  SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
  blockDataCleanup(pBlock);

  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
2534
  void*        p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2535
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2536

H
Haojun Liao 已提交
2537
  int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
dengyihao's avatar
dengyihao 已提交
2538
  if (code != 0) {
H
Haojun Liao 已提交
2539
    T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
dengyihao 已提交
2540
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2541

H
Haojun Liao 已提交
2542
  STsdbReader* reader = pInfo->base.dataReader;
dengyihao's avatar
opt mem  
dengyihao 已提交
2543
  while (tsdbNextDataBlock(reader)) {
H
Haojun Liao 已提交
2544 2545
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
dengyihao's avatar
opt mem  
dengyihao 已提交
2546 2547 2548 2549 2550 2551 2552 2553 2554 2555
    }

    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

    blockDataCleanup(pBlock);

H
Haojun Liao 已提交
2556 2557 2558 2559
    int32_t rows = 0;
    tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window);
    blockDataEnsureCapacity(pBlock, rows);
    pBlock->info.rows = rows;
dengyihao's avatar
opt mem  
dengyihao 已提交
2560

H
Haojun Liao 已提交
2561
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2562 2563 2564 2565
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2566 2567

    uint32_t status = 0;
H
Haojun Liao 已提交
2568 2569
    loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
//    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2570
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2571
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2572 2573 2574 2575 2576 2577 2578
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

H
Haojun Liao 已提交
2579
    pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2580

H
Haojun Liao 已提交
2581
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2582
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2583

H
Haojun Liao 已提交
2584 2585
    tsdbReaderClose(pInfo->base.dataReader);
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2586 2587
    return pBlock;
  }
H
Haojun Liao 已提交
2588

H
Haojun Liao 已提交
2589 2590
  tsdbReaderClose(pInfo->base.dataReader);
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2591 2592 2593
  return NULL;
}

2594 2595 2596
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2597
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2598
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2599
      tsTargetSlotId = colInfo->dstSlotId;
2600 2601 2602
    }
  }

2603 2604 2605
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2606
  bi.slotId = tsTargetSlotId;
2607 2608 2609 2610 2611 2612 2613
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

dengyihao's avatar
opt mem  
dengyihao 已提交
2614 2615 2616 2617 2618 2619 2620 2621
int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2622

2623
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2624 2625 2626
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2627
  {
H
Haojun Liao 已提交
2628
    size_t  numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2629
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2630
    for (; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2631
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
S
slzhou 已提交
2632 2633 2634 2635 2636 2637
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2638

S
slzhou 已提交
2639 2640
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2641

H
Haojun Liao 已提交
2642
  pInfo->base.dataReader = NULL;
2643

2644 2645
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2646
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2647
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2648 2649
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2650

dengyihao's avatar
dengyihao 已提交
2651
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2652 2653 2654 2655 2656 2657

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2658 2659 2660 2661 2662
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2663 2664

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2665
    dumpSQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2666
    taosArrayPush(pInfo->queryConds, &cond);
2667 2668
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2669
  for (int32_t i = 0; i < numOfTable; ++i) {
2670
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2671
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2672
    ps->param = param;
2673
    ps->onlyRef = true;
2674 2675 2676 2677 2678 2679
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2680
    T_LONG_JMP(pTaskInfo->env, terrno);
2681 2682
  }

2683 2684 2685 2686 2687 2688 2689
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2690
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2691

2692 2693 2694 2695 2696 2697 2698
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2699
  for (int32_t i = 0; i < numOfTable; ++i) {
2700 2701 2702
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
  }
2703 2704
  taosArrayClear(pInfo->sortSourceParams);

2705
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2706
  pInfo->pSortHandle = NULL;
2707

dengyihao's avatar
opt mem  
dengyihao 已提交
2708 2709 2710
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2711
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2712 2713 2714
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2715 2716 2717
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2718 2719
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2720 2721 2722
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2723
  blockDataCleanup(pResBlock);
2724 2725

  while (1) {
2726
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2727 2728 2729 2730
    if (pTupleHandle == NULL) {
      break;
    }

2731 2732
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2733 2734 2735 2736
      break;
    }
  }

2737
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
2738 2739 2740
  applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
  pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;

2741
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2754
    T_LONG_JMP(pTaskInfo->env, code);
2755
  }
2756

H
Haojun Liao 已提交
2757
  size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2758 2759
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2760

S
slzhou 已提交
2761
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2762
      setOperatorCompleted(pOperator);
2763 2764
      return NULL;
    }
S
slzhou 已提交
2765
    pInfo->tableStartIndex = 0;
H
Haojun Liao 已提交
2766
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
2767 2768
    startGroupTableMergeScan(pOperator);
  }
2769

S
slzhou 已提交
2770 2771
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
L
Liu Jicong 已提交
2772 2773
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2774 2775 2776 2777 2778 2779 2780
    if (pBlock != NULL) {
      pBlock->info.groupId = pInfo->groupId;
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2781
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2782 2783 2784
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
H
Haojun Liao 已提交
2785
      pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2786 2787
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
2788 2789
  }

2790 2791 2792
  return pBlock;
}

2793
void destroyTableMergeScanOperatorInfo(void* param) {
2794
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2795
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2796

dengyihao's avatar
dengyihao 已提交
2797 2798 2799
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2800 2801
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2802
  }
H
Haojun Liao 已提交
2803

2804
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2805 2806
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2807

H
Haojun Liao 已提交
2808 2809
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2810

dengyihao's avatar
opt mem  
dengyihao 已提交
2811 2812 2813
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2814
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2815
  taosArrayDestroy(pTableScanInfo->queryConds);
2816

H
Haojun Liao 已提交
2817 2818
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
2819 2820 2821 2822 2823 2824
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
H
Haojun Liao 已提交
2825
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
L
Liu Jicong 已提交
2826

H
Haojun Liao 已提交
2827 2828 2829 2830
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);

D
dapan1121 已提交
2831
  taosMemoryFreeClear(param);
2832 2833 2834 2835
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2836 2837
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2838
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2839
  execInfo->blockRecorder = pInfo->base.readRecorder;
2840
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2841 2842 2843

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2844

2845 2846 2847
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2848 2849
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                                SExecTaskInfo* pTaskInfo) {
2850 2851 2852 2853 2854
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2855

2856 2857 2858
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
2859
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
2860
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
2861 2862 2863
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2864

H
Haojun Liao 已提交
2865
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
2866
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2867
    taosArrayDestroy(pInfo->base.matchInfo.pList);
2868 2869 2870 2871
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
2872
    SExprSupp* pSup = &pInfo->base.pseudoSup;
2873 2874
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
2875 2876 2877 2878
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
2879 2880 2881 2882 2883 2884
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
2885 2886
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
2887
  pInfo->base.readHandle = *readHandle;
2888 2889 2890

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
H
Haojun Liao 已提交
2891

2892
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2893
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
2894 2895 2896 2897 2898 2899

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

2900

H
Haojun Liao 已提交
2901
  initResultSizeInfo(&pOperator->resultInfo, 1024);
2902
  pInfo->pResBlock = createResDataBlock(pDescNode);
H
Haojun Liao 已提交
2903 2904
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

2905
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2906

H
Haojun Liao 已提交
2907
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
2908
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2909
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
2910

dengyihao's avatar
dengyihao 已提交
2911
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
2912 2913
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
2914

L
Liu Jicong 已提交
2915 2916
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2917
  pOperator->exprSupp.numOfExprs = numOfCols;
2918

L
Liu Jicong 已提交
2919 2920
  pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         getTableMergeScanExplainExecInfo);
2921 2922 2923 2924 2925 2926 2927 2928 2929
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}