scanoperator.c 130.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
36
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
37
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
38
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
39

H
Haojun Liao 已提交
40 41 42 43 44 45 46 47 48
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
X
Xiaoyu Wang 已提交
49
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
50
  bool           multiReader;
51
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
52 53
} STableMergeScanSortSourceParam;

54 55 56 57 58 59 60 61 62 63
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
64
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
65

H
Haojun Liao 已提交
66
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
67 68 69 70 71 72 73 74 75 76 77 78
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

79
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
80 81 82 83 84
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

85 86 87 88 89 90 91 92 93
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
94
  // convert key to second
95 96 97 98 99 100 101
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
102
  time_t    t = (time_t)key;
103
  taosLocalTime(&t, &tm, NULL);
104 105 106 107

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
108
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
109 110 111 112

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
113
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
114 115 116 117

  tw->ekey -= 1;
}

118
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
119 120 121 122 123 124 125
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

126
  if (order == TSDB_ORDER_ASC) {
127
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
128
    ASSERT(w.ekey >= pBlockInfo->window.skey);
129

130
    if (w.ekey < pBlockInfo->window.ekey) {
131 132 133
      return true;
    }

134 135
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
136 137 138 139
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

140
      ASSERT(w.ekey > pBlockInfo->window.ekey);
141
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
142 143 144 145
        return true;
      }
    }
  } else {
146
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
147
    ASSERT(w.skey <= pBlockInfo->window.ekey);
148

149
    if (w.skey > pBlockInfo->window.skey) {
150 151 152
      return true;
    }

153
    while (1) {
154 155 156 157 158 159
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
160
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
161 162 163
        return true;
      }
    }
164 165 166 167 168
  }

  return false;
}

169 170 171 172 173 174 175 176 177 178 179
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
180 181
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
182 183 184 185 186

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
187
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
188 189 190
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
191

192 193 194 195 196 197
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
198
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
199 200 201
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
202
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
203 204

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
205
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
206 207 208 209 210 211 212 213 214

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
215
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
216 217 218 219 220 221 222 223 224

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
225
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
226 227 228 229 230 231 232 233

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
234
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
235
                               int32_t numOfRows) {
H
Haojun Liao 已提交
236
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
237 238 239
    return true;
  }

H
Haojun Liao 已提交
240
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
241 242 243
  return keep;
}

H
Haojun Liao 已提交
244
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
245
  bool    allColumnsHaveAgg = true;
246
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
247
  if (code != TSDB_CODE_SUCCESS) {
248
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
249 250 251 252 253 254 255 256
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
257
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
258
                               int32_t rows) {
H
Haojun Liao 已提交
259 260 261
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

262
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
263
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
264
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
265
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
266 267
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
268 269 270

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
271 272 273
  }
}

274
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
275
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
276
  const char* id = GET_TASKID(pTaskInfo);
277

278
  if (pLimitInfo->remainOffset > 0) {
279 280
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
281
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
282
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
283
      return false;
284
    } else {
285
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
286 287 288 289 290 291
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
292
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
293
    blockDataKeepFirstNRows(pBlock, keep);
294 295

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
296
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
297
    return true;
298
  }
299

300
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
301
  return false;
302 303
}

H
Haojun Liao 已提交
304
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
305
                             uint32_t* status) {
S
slzhou 已提交
306
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
307
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
308 309

  pCost->totalBlocks += 1;
310
  pCost->totalRows += pBlock->info.rows;
311

H
Haojun Liao 已提交
312
  bool loadSMA = false;
H
Haojun Liao 已提交
313
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
314
  if (pOperator->exprSupp.pFilterInfo != NULL ||
315
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
316 317 318 319
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
320
  taosMemoryFreeClear(pBlock->pBlockAgg);
321 322

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
323
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
324
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
325
    pCost->filterOutBlocks += 1;
326
    pCost->totalRows += pBlock->info.rows;
327
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
328 329
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
330 331 332
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
333
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
334
    pCost->skipBlocks += 1;
335
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
336
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
337
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
338
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
339
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
340
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
341
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
342
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
343
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
344
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
345
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
346 347
      return TSDB_CODE_SUCCESS;
    } else {
348
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
349
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
350
    }
H
Haojun Liao 已提交
351
  }
352

H
Haojun Liao 已提交
353
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
354

H
Haojun Liao 已提交
355
  // try to filter data block according to sma info
H
Haojun Liao 已提交
356
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
357 358 359
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
360
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
361
      if (!keep) {
X
Xiaoyu Wang 已提交
362 363
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
364 365 366
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

367
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
368 369
        return TSDB_CODE_SUCCESS;
      }
370
    }
H
Haojun Liao 已提交
371
  }
372

373 374 375
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

376
  // try to filter data block according to current results
377 378
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
379 380
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
381
    pCost->skipBlocks += 1;
382
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
383
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
384 385 386
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
387 388
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
389

H
Haojun Liao 已提交
390 391
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
392
    return terrno;
H
Haojun Liao 已提交
393 394
  }

H
Haojun Liao 已提交
395
  ASSERT(p == pBlock);
396
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
397

H
Haojun Liao 已提交
398 399
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
400

H
Haojun Liao 已提交
401
  if (pOperator->exprSupp.pFilterInfo != NULL) {
402
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
403
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
404

405 406
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
407

408 409
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
410
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
411 412 413 414
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
415 416
  }

417
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
418
  if (limitReached) {  // set operator flag is done
419 420
    setOperatorCompleted(pOperator);
  }
421

H
Haojun Liao 已提交
422
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
423 424 425
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
426
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
427 428 429
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
430
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
431 432
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
433 434
}

435 436
typedef struct STableCachedVal {
  const char* pName;
437
  STag*       pTags;
438 439
} STableCachedVal;

440 441 442 443 444 445 446 447 448 449 450
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
451 452
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
453
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
454 455 456 457
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
458
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
459 460 461 462 463 464 465
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

466 467
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
468

469 470 471 472 473
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
474
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
475 476 477
  }
}

478 479
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
480
  // currently only the tbname pseudo column
481
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
482
    return TSDB_CODE_SUCCESS;
483 484
  }

485 486
  int32_t code = 0;

487 488 489 490
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

491
  bool            freeReader = false;
492
  STableCachedVal val = {0};
493 494

  SMetaReader mr = {0};
495
  LRUHandle*  h = NULL;
496

497 498 499
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

500
  // 1. check if it is existed in meta cache
501
  if (pCache == NULL) {
502
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
503
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
504
    if (code != TSDB_CODE_SUCCESS) {
505
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
506
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
507 508
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
509 510 511

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
512
      } else {
S
slzhou 已提交
513 514
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
515
      }
516 517 518 519 520
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
521

522 523
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
524 525

    freeReader = true;
526
  } else {
527 528
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
529
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
530 531
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
532
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
533
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
534
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
535
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
536
                pBlock->info.id.uid, tstrerror(terrno), idStr);
537 538
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
539
        } else {
H
Haojun Liao 已提交
540
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
541
                 idStr);
H
Haojun Liao 已提交
542
        }
543 544 545 546 547 548
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
549
      STableCachedVal* pVal = createTableCacheVal(&mr);
550

H
Haojun Liao 已提交
551
      val = *pVal;
552
      freeReader = true;
H
Haojun Liao 已提交
553

H
Haojun Liao 已提交
554
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
555
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
556 557 558 559 560 561 562 563
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
564

H
Haojun Liao 已提交
565
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
566
    }
H
Haojun Liao 已提交
567

568 569
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
570
  }
571

572 573
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
574
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
575 576

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
577
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
578

579
    int32_t functionId = pExpr1->pExpr->_function.functionId;
580 581 582

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
583
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
584
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
585
      STagVal tagVal = {0};
586 587
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
588

589 590 591 592
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
593
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
594
      }
595

H
Haojun Liao 已提交
596 597
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
598
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
599
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
600
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
601 602 603
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
604 605 606 607 608 609
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
610
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
611
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
612
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
613
        }
614 615 616 617
      }
    }
  }

618 619
  // restore the rows
  pBlock->info.rows = backupRows;
620 621 622 623
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
624
  return TSDB_CODE_SUCCESS;
625 626
}

H
Haojun Liao 已提交
627
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
628 629 630
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
631
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
632
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
633 634 635
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
636

H
Haojun Liao 已提交
637
  colInfoDataEnsureCapacity(&infoData, 1, false);
638
  colDataSetVal(&infoData, 0, buf, false);
639

H
Haojun Liao 已提交
640
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
641
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
642 643 644 645 646 647 648

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
649
  colDataDestroy(&infoData);
650 651
}

652
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
653
  STableScanInfo* pTableScanInfo = pOperator->info;
654
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
655
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
656 657
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
658

659 660
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
661 662 663 664 665 666 667 668 669 670
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
671

672
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
673
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
674
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
675
    }
H
Haojun Liao 已提交
676

677
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
678
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
679 680 681
      break;
    }

682 683 684 685 686 687
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
688
    if (pBlock->info.id.uid) {
689
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
690
    }
691

692
    uint32_t status = 0;
693
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
694
    if (code != TSDB_CODE_SUCCESS) {
695
      T_LONG_JMP(pTaskInfo->env, code);
696
    }
697

698 699 700
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
701
    }
702

H
Haojun Liao 已提交
703 704
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
705

H
Haojun Liao 已提交
706
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
707 708

    // todo refactor
H
Haojun Liao 已提交
709
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
710
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
X
Xiaoyu Wang 已提交
711 712 713
    //    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    //    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
    //    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
714

715
    return pBlock;
H
Haojun Liao 已提交
716 717 718 719
  }
  return NULL;
}

H
Haojun Liao 已提交
720
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
721 722 723 724
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
725
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
726 727 728
    return NULL;
  }

729 730
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
731 732 733
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
734 735
    }

736
    pTableScanInfo->scanTimes += 1;
737

738
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
739
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
740 741
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
742
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
743

744
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
745
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
746
    }
747
  }
H
Haojun Liao 已提交
748

749
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
750
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
751 752 753
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
754
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
755
    }
H
Haojun Liao 已提交
756

757
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
758 759 760
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
761
      }
H
Haojun Liao 已提交
762

763
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
764

765
      if (pTableScanInfo->scanTimes < total) {
766
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
767
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
768

769
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
770
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
771
      }
H
Haojun Liao 已提交
772 773 774
    }
  }

wmmhello's avatar
wmmhello 已提交
775 776 777 778 779 780 781
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

782
  // scan table one by one sequentially
L
Liu Jicong 已提交
783
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
784
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
785
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
786

L
Liu Jicong 已提交
787
    while (1) {
H
Haojun Liao 已提交
788
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
789
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
790 791
        return result;
      }
H
Haojun Liao 已提交
792

L
Liu Jicong 已提交
793 794
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
795 796

      taosRLockLatch(&pTaskInfo->lock);
797
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
798

H
Haojun Liao 已提交
799
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
800
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
801
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
802 803
        return NULL;
      }
H
Haojun Liao 已提交
804

X
Xiaoyu Wang 已提交
805
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
806 807 808 809
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
810
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
811

H
Haojun Liao 已提交
812
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
813 814
      pInfo->scanTimes = 0;
    }
815 816
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
817
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
818
        setOperatorCompleted(pOperator);
819 820
        return NULL;
      }
821

5
54liuyao 已提交
822
      int32_t        num = 0;
823
      STableKeyInfo* pList = NULL;
824
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
825
      ASSERT(pInfo->base.dataReader == NULL);
826

L
Liu Jicong 已提交
827
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
828
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
829 830 831
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
832 833 834 835

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
836
    }
H
Haojun Liao 已提交
837

H
Haojun Liao 已提交
838
    SSDataBlock* result = doGroupedTableScan(pOperator);
839 840 841
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
842

843
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
844
      setOperatorCompleted(pOperator);
845 846
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
847

848 849
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
850
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
851

5
54liuyao 已提交
852
    int32_t        num = 0;
853
    STableKeyInfo* pList = NULL;
854
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
855

H
Haojun Liao 已提交
856 857
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
858
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
859

H
Haojun Liao 已提交
860
    result = doGroupedTableScan(pOperator);
861 862 863
    if (result != NULL) {
      return result;
    }
864

H
Haojun Liao 已提交
865
    setOperatorCompleted(pOperator);
866 867
    return NULL;
  }
H
Haojun Liao 已提交
868 869
}

870 871
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
872
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
873
  *pRecorder = pTableScanInfo->base.readRecorder;
874 875 876 877 878
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

879 880
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
881

882 883
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
884

885 886
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
887
  }
L
Liu Jicong 已提交
888

889
  tableListDestroy(pBase->pTableListInfo);
890 891 892 893 894 895 896 897
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
898
  taosMemoryFreeClear(param);
899 900
}

901
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
902
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
903
  int32_t         code = 0;
H
Haojun Liao 已提交
904 905 906
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
907
    code = TSDB_CODE_OUT_OF_MEMORY;
908
    goto _error;
H
Haojun Liao 已提交
909 910
  }

911
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
912
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
913 914

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
915 916
  code =
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
917 918 919 920
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
921
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
922
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
923
  if (code != TSDB_CODE_SUCCESS) {
924
    goto _error;
925 926
  }

H
Haojun Liao 已提交
927
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
928
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
929
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
930
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
931 932
  }

933
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
934
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
935

H
Haojun Liao 已提交
936 937
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
938 939
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

940 941
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
942

H
Haojun Liao 已提交
943
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
944
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
945
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
946

H
Haojun Liao 已提交
947 948 949
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
950 951
  }

wmmhello's avatar
wmmhello 已提交
952
  pInfo->currentGroupId = -1;
953
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
954
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
955

L
Liu Jicong 已提交
956 957
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
958
  pOperator->exprSupp.numOfExprs = numOfCols;
959

960
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
961 962
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
963 964 965
    code = terrno;
    goto _error;
  }
966

D
dapan1121 已提交
967 968 969 970
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
971
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
972 973
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
974 975 976

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
977
  return pOperator;
978

979
_error:
980 981 982
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
983

984 985
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
986
  return NULL;
H
Haojun Liao 已提交
987 988
}

989
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
990
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
991
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
992

H
Haojun Liao 已提交
993
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
994
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
995

L
Liu Jicong 已提交
996 997
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
998
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
999 1000 1001
  return pOperator;
}

1002
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1003
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1004 1005
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1006 1007
}

1008
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1009
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1010 1011
}

1012
static bool isStateWindow(SStreamScanInfo* pInfo) {
1013
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1014
}
5
54liuyao 已提交
1015

L
Liu Jicong 已提交
1016
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1017 1018 1019
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1020 1021 1022
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1023
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1024 1025
}

1026 1027 1028 1029
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1030
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1031 1032
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1033
  ASSERT(rowIndex < pBlock->info.rows);
1034
  pInfo->groupId = groupCol[rowIndex];
1035 1036
}

L
Liu Jicong 已提交
1037
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
1038
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
1039 1040
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1041
  tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1042
  qDebug("1");
H
Haojun Liao 已提交
1043
  pTableScanInfo->base.dataReader = NULL;
1044 1045
}

L
Liu Jicong 已提交
1046 1047
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1048
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1049

1050
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1051
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1052 1053 1054 1055 1056 1057 1058 1059 1060

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1061
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1062
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1063 1064
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1065
    T_LONG_JMP(pTaskInfo->env, code);
1066 1067 1068
    return NULL;
  }

D
dapan1121 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    T_LONG_JMP(pTaskInfo->env, code);
    return NULL;
  }

  if (hasNext) {
L
Liu Jicong 已提交
1078
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1079
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1080
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1081 1082 1083
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1084
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1085 1086
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1087 1088

  return pBlock->info.rows > 0 ? pBlock : NULL;
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1100
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1101
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1102
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1103 1104
}

5
54liuyao 已提交
1105 1106 1107 1108 1109 1110 1111 1112
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1113
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1114 1115 1116
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1127 1128 1129
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1130 1131 1132 1133 1134 1135

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1136
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1137 1138 1139 1140
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1141 1142 1143
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1144
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1145 1146 1147
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1148
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1149 1150 1151
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1152 1153
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1154 1155 1156 1157
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1158
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1159 1160 1161
  return true;
}

5
54liuyao 已提交
1162
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1163
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1164
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1165
  dumyInfo.cur.pageId = -1;
1166
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1167 1168
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1169
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1170

5
54liuyao 已提交
1171
  while (1) {
1172 1173 1174
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1175
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1176 1177 1178 1179 1180
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1181
    }
5
54liuyao 已提交
1182

5
54liuyao 已提交
1183 1184 1185
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1186
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1187
    endWin = preWin;
5
54liuyao 已提交
1188
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1189 1190 1191 1192 1193 1194
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1195

L
Liu Jicong 已提交
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1207
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1208
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1209
      tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1210
      qDebug("2");
H
Haojun Liao 已提交
1211
      pTableScanInfo->base.dataReader = NULL;
1212 1213
      return NULL;
    }
L
Liu Jicong 已提交
1214

H
Haojun Liao 已提交
1215
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1216 1217 1218 1219
    if (pResult->info.rows == 0) {
      continue;
    }

1220 1221 1222 1223 1224 1225 1226 1227
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1228 1229
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1230
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1231 1232 1233 1234
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1235 1236 1237

      blockDataDestroy(tmpBlock);

1238 1239 1240 1241
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1242
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1243
      pResult->info.calWin = pInfo->updateWin;
1244
      return pResult;
5
54liuyao 已提交
1245 1246
    }
  }
1247
}
1248

1249
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1250
                                   SSessionKey* pKey) {
1251 1252 1253
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1254

1255 1256 1257 1258 1259
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
H
Haojun Liao 已提交
1260 1261

  taosMemoryFree(pCur);
1262 1263 1264
  return code;
}

1265
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1266
  blockDataCleanup(pDestBlock);
1267 1268
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1269
  }
1270
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1271
  if (code != TSDB_CODE_SUCCESS) {
1272
    return code;
L
Liu Jicong 已提交
1273
  }
1274 1275
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1276
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1277
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1278
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1279 1280
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1281

1282 1283
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1284
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1285
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1286 1287
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1288
  int64_t          version = pSrcBlock->info.version - 1;
1289
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1290
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1291
    // gap must be 0.
5
54liuyao 已提交
1292
    SSessionKey startWin = {0};
1293
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1294
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1295 1296 1297
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1298 1299
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1300
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1301 1302 1303 1304
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1305
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1306 1307
      continue;
    }
1308 1309
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1310

1311
    colDataSetNULL(pDestUidCol, i);
1312
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1313 1314
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1315
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1316
  }
1317
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1318
}
1319 1320 1321 1322 1323 1324

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1325
  }
1326

1327 1328
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1329 1330
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1331

L
Liu Jicong 已提交
1332
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1333
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1370 1371
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1372
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1373 1374 1375
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1376
  for (int32_t i = 0; i < rows;) {
1377
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1378 1379 1380 1381 1382
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1383
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1384
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1385 1386
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1387 1388 1389 1390 1391
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1392
    pDestBlock->info.rows++;
5
54liuyao 已提交
1393
  }
1394 1395
  return TSDB_CODE_SUCCESS;
}
1396

1397
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1398 1399 1400
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1401 1402
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1403
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1404 1405 1406 1407
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1418
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1419 1420
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1421
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1422 1423 1424
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1425
    if (pInfo->tbnameCalSup.pExprInfo) {
1426 1427 1428
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1429 1430
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1431
      tdbFree(parTbname);
L
Liu Jicong 已提交
1432 1433 1434
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1435 1436 1437 1438
  }
  return TSDB_CODE_SUCCESS;
}

1439 1440 1441 1442
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1443
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1444
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1445 1446
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1447
  }
1448
  pDestBlock->info.type = STREAM_CLEAR;
1449
  pDestBlock->info.version = pSrcBlock->info.version;
1450
  pDestBlock->info.dataLoad = 1;
1451 1452 1453 1454
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1455 1456 1457
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SExprSupp*    pTagCalSup = &pInfo->tagCalSup;
1458
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1459
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
L
Liu Jicong 已提交
1460
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1461

L
Liu Jicong 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
  void*   tag = NULL;
  int32_t tagLen = 0;
  if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
    pBlock->info.tagLen = tagLen;
    void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
    if (pTag == NULL) {
      tdbFree(tag);
      taosMemoryFree(pBlock->info.pTag);
      pBlock->info.pTag = NULL;
      pBlock->info.tagLen = 0;
      return;
    }
    pBlock->info.pTag = pTag;
    memcpy(pBlock->info.pTag, tag, tagLen);
    tdbFree(tag);
    return;
L
Liu Jicong 已提交
1478
  } else {
L
Liu Jicong 已提交
1479
    pBlock->info.pTag = NULL;
L
Liu Jicong 已提交
1480
  }
L
Liu Jicong 已提交
1481 1482 1483
  tdbFree(tag);
}
#endif
L
Liu Jicong 已提交
1484

5
54liuyao 已提交
1485
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1486 1487
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
5
54liuyao 已提交
1488 1489
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1490
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1491
  } else {
5
54liuyao 已提交
1492 1493
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1494
  }
L
Liu Jicong 已提交
1495 1496
}

1497 1498
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1499 1500
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1501 1502
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1503 1504
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1505
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1506 1507 1508 1509 1510 1511 1512
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1513
  pBlock->info.rows++;
5
54liuyao 已提交
1514 1515
}

1516
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1517 1518
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1519
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1520
  }
1521 1522
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1523
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1524
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1525
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1526 1527
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1528
    bool        isClosed = false;
5
54liuyao 已提交
1529
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1530
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1531 1532 1533 1534 1535
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1536 1537 1538
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1539
    // must check update info first.
H
Haojun Liao 已提交
1540
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1541
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1542
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1543
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1544
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1545
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1546
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1547
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1548
                                       NULL);
5
54liuyao 已提交
1549 1550
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1551 1552
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1553
      }
1554 1555
    }
  }
1556 1557
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1558
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1559
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1560
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1561 1562
  }
}
L
Liu Jicong 已提交
1563

1564
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1565 1566
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1567
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1568

1569 1570
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1571
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1572
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1573
  pInfo->pRes->info.type = STREAM_NORMAL;
1574
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1575

1576
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1577
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1578 1579

  // todo extract method
H
Haojun Liao 已提交
1580 1581 1582
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1583 1584 1585 1586 1587 1588 1589
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1590
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1591
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1592 1593 1594 1595 1596 1597 1598
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1599
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1600
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1601 1602 1603 1604 1605
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1606
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1607
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1608 1609
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1610
      blockDataFreeRes((SSDataBlock*)pBlock);
1611
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1612
    }
K
kailixu 已提交
1613 1614 1615

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1616 1617
  }

1618
  if (filter) {
H
Haojun Liao 已提交
1619
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1620
  }
1621

1622
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1623
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1624
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1625

L
Liu Jicong 已提交
1626
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1627 1628
  return 0;
}
5
54liuyao 已提交
1629

L
Liu Jicong 已提交
1630
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1631 1632
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1633
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1634

1635
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1636

L
Liu Jicong 已提交
1637
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
1638
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1639
      SPackedData submit = pTaskInfo->streamInfo.submit;
1640
      if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
L
Liu Jicong 已提交
1641
        qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
1642
        return NULL;
L
Liu Jicong 已提交
1643 1644 1645 1646 1647 1648
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1649
    while (tqNextDataBlock(pInfo->tqReader)) {
L
Liu Jicong 已提交
1650 1651
      SSDataBlock block = {0};

1652
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1653 1654 1655 1656
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1657
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1658 1659 1660 1661 1662 1663

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1664 1665
    pInfo->tqReader->msg2 = (SPackedData){0};
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1666
    return NULL;
L
Liu Jicong 已提交
1667 1668
  }

1669
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1670 1671
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
X
Xiaoyu Wang 已提交
1672 1673 1674
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
             pInfo->tqReader->pWalReader->curVersion);
1675
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1676
      return pResult;
1677
    }
1678 1679 1680 1681 1682 1683
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
    if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
      return NULL;
1684
    }
wmmhello's avatar
wmmhello 已提交
1685
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1686 1687
  }

1688
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
1689 1690
    while (1) {
      SFetchRet ret = {0};
1691
      tqNextBlock(pInfo->tqReader, &ret);
X
Xiaoyu Wang 已提交
1692 1693 1694
      tqOffsetResetToLog(
          &pTaskInfo->streamInfo.currentOffset,
          pInfo->tqReader->pWalReader->curVersion - 1);  // curVersion move to next, so currentOffset = curVersion - 1
1695

L
Liu Jicong 已提交
1696
      if (ret.fetchType == FETCH_TYPE__DATA) {
X
Xiaoyu Wang 已提交
1697 1698
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, ret.data.info.rows,
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1699
        blockDataCleanup(pInfo->pRes);
1700
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1701
        if (pInfo->pRes->info.rows > 0) {
X
Xiaoyu Wang 已提交
1702 1703
          qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
                 pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1704 1705
          return pInfo->pRes;
        }
X
Xiaoyu Wang 已提交
1706
      } else if (ret.fetchType == FETCH_TYPE__NONE) {
wmmhello's avatar
wmmhello 已提交
1707
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1708 1709 1710
        return NULL;
      }
    }
L
Liu Jicong 已提交
1711
  } else {
1712
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1713
    return NULL;
H
Haojun Liao 已提交
1714
  }
L
Liu Jicong 已提交
1715 1716
}

L
Liu Jicong 已提交
1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1735 1736 1737
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1738

1739 1740 1741
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1742 1743 1744
      j++;
    }
  }
L
Liu Jicong 已提交
1745
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1746 1747
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1748
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1749 1750 1751 1752

  return 0;
}

5
54liuyao 已提交
1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1765
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1766 1767 1768 1769
    }
  }
}

5
54liuyao 已提交
1770
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1771
  if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1772
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1784 1785 1786 1787
    }
  }
}

L
Liu Jicong 已提交
1788 1789 1790 1791 1792
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1793
  qDebug("stream scan called");
H
Haojun Liao 已提交
1794

1795 1796
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1797
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1798
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1799
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1800 1801 1802 1803
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1804
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1805
    } else {
H
Haojun Liao 已提交
1806 1807 1808 1809
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1810
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1811
    }
L
Liu Jicong 已提交
1812 1813

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1814
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1815

H
Haojun Liao 已提交
1816
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1817
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1818

L
Liu Jicong 已提交
1819 1820
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1821
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1822 1823
  }

5
54liuyao 已提交
1824 1825
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1826 1827 1828 1829 1830
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1831 1832 1833 1834 1835 1836 1837

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1838 1839 1840 1841
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1842
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1843 1844
        return pInfo->pUpdateRes;
      } break;
1845 1846 1847 1848 1849 1850 1851 1852 1853
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1854 1855 1856 1857 1858 1859 1860 1861
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1862
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1863 1864 1865 1866 1867 1868
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1869 1870 1871 1872 1873 1874
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1875
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1876
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1877
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1878 1879 1880 1881 1882 1883
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1884
      }
5
54liuyao 已提交
1885 1886
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1887
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1888 1889
        return pInfo->pCreateTbRes;
      }
X
Xiaoyu Wang 已提交
1890
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1891 1892
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1893 1894
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1895
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1896
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1897

H
Haojun Liao 已提交
1898
    pTSInfo->base.dataReader = NULL;
1899

H
Haojun Liao 已提交
1900 1901
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1902

L
Liu Jicong 已提交
1903
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1904 1905 1906
    return NULL;
  }

5
54liuyao 已提交
1907
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1908
// TODO: refactor
L
Liu Jicong 已提交
1909
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1910
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1911
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1912
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1913
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1914 1915 1916
      return NULL;
    }

1917
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1918 1919
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1920
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1921
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1922
    }
1923

1924
    // TODO move into scan
5
54liuyao 已提交
1925 1926
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1927
    pBlock->info.dataLoad = 1;
1928
    blockDataUpdateTsWindow(pBlock, 0);
1929
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1930 1931 1932
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1933 1934 1935
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1936 1937
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1938 1939 1940
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1941
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1942
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1943
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1944
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1945
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1946 1947
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1948
        }
5
54liuyao 已提交
1949 1950
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1951 1952 1953 1954 1955 1956
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1957
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1958
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1959
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1960
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1961 1962
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1963 1964 1965 1966 1967
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1968 1969 1970
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1971
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1972 1973 1974
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1975 1976 1977 1978
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1979
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1980
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1981 1982 1983 1984
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1985
        }
1986 1987 1988
      } break;
      default:
        break;
5
54liuyao 已提交
1989
    }
1990
    // printDataBlock(pBlock, "stream scan recv");
1991
    return pBlock;
L
Liu Jicong 已提交
1992
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1993
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1994 1995 1996
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1997
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1998 1999 2000
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2001 2002 2003
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2004
      } break;
2005
      case STREAM_SCAN_FROM_DELETE_DATA: {
2006 2007 2008 2009 2010 2011 2012
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2023
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
2024 2025
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
2026 2027
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
2028
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2029
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2030 2031
          return pSDB;
        }
2032
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2033 2034 2035 2036
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2037
    }
2038

2039
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2040
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2041 2042
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2043 2044
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2045
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2046
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2047
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2048
    }
5
54liuyao 已提交
2049

H
Haojun Liao 已提交
2050 2051
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2052
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2053

L
Liu Jicong 已提交
2054
  NEXT_SUBMIT_BLK:
2055
    while (1) {
L
Liu Jicong 已提交
2056
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2057
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2058
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2059
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2060
          qDebug("stream scan return empty, consume block %d", totBlockNum);
2061 2062
          return NULL;
        }
2063

L
Liu Jicong 已提交
2064 2065
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
2066
        if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2067 2068 2069 2070
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2071 2072
      }

2073 2074
      blockDataCleanup(pInfo->pRes);

2075
      while (tqNextDataBlock(pInfo->tqReader)) {
2076
        SSDataBlock block = {0};
2077

2078
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2079 2080 2081 2082 2083

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2084
        setBlockIntoRes(pInfo, &block, false);
2085

H
Haojun Liao 已提交
2086
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
2087
                             pInfo->pRes->info.version)) {
2088 2089 2090 2091 2092
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

5
54liuyao 已提交
2093 2094 2095
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2096 2097
        }

5
54liuyao 已提交
2098
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2099
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2100
        pInfo->pRes->info.dataLoad = 1;
2101 2102 2103
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2104 2105 2106
          break;
        }
      }
2107
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2108
        break;
J
jiacy-jcy 已提交
2109
      } else {
2110
        continue;
5
54liuyao 已提交
2111
      }
H
Haojun Liao 已提交
2112 2113 2114 2115
    }

    // record the scan action.
    pInfo->numOfExec++;
2116
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2117
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2118

X
Xiaoyu Wang 已提交
2119
    qDebug("scan rows: %" PRId64, pBlockInfo->rows);
L
Liu Jicong 已提交
2120 2121 2122
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2123 2124 2125 2126 2127 2128

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2129 2130 2131
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2132 2133 2134
  }
}

H
Haojun Liao 已提交
2135
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2136 2137 2138
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2139 2140 2141
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2142 2143 2144 2145 2146 2147
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2148
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2149 2150
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2151
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2152
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2153
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2154
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2155

wmmhello's avatar
wmmhello 已提交
2156
  qDebug("tmqsnap doRawScan called");
2157
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2158 2159 2160 2161 2162
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
2163
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2164 2165
      }
    }
X
Xiaoyu Wang 已提交
2166

D
dapan1121 已提交
2167
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2168
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2169
        tsdbReleaseDataBlock(pInfo->dataReader);
2170
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2171
      }
2172

H
Haojun Liao 已提交
2173 2174
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
2175
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2176 2177
      }

H
Haojun Liao 已提交
2178
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2179
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2180 2181
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2182 2183

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2184
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2185
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2186
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2187
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2188
    } else {
2189
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2190
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2191
    }
2192
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2193
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2194
    return NULL;
2195
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2196 2197 2198 2199 2200 2201
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2202
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2203
      taosMemoryFreeClear(data);
2204 2205 2206
      return NULL;
    }

2207
    if (!sContext->queryMeta) {  // change to get data next poll request
2208 2209 2210
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2211
    } else {
2212
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2213 2214 2215 2216
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2217

wmmhello's avatar
wmmhello 已提交
2218
    return NULL;
2219
  }
L
Liu Jicong 已提交
2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2258 2259 2260
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2261
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2262 2263 2264
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
2265
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2266 2267 2268
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2269 2270 2271
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2272
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2273 2274 2275 2276 2277
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2278 2279
  int32_t code = TSDB_CODE_SUCCESS;

2280
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2281
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2282
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2283 2284
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2285 2286
  }

2287
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2288 2289
  pInfo->vnode = pHandle->vnode;

2290
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2291 2292
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2293

2294
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2295
  return pOperator;
H
Haojun Liao 已提交
2296

L
Liu Jicong 已提交
2297
_end:
H
Haojun Liao 已提交
2298 2299 2300 2301
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2302 2303
}

2304
static void destroyStreamScanOperatorInfo(void* param) {
2305
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2306

2307
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2308
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2309
  }
2310

2311 2312 2313
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2314 2315
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2316
  }
C
Cary Xu 已提交
2317 2318
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2319
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2320
  }
C
Cary Xu 已提交
2321

L
Liu Jicong 已提交
2322
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2323
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2324

L
Liu Jicong 已提交
2325
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2326 2327 2328 2329
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2330
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2331
  blockDataDestroy(pStreamScan->pCreateTbRes);
2332 2333 2334 2335
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2336
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2337
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2338
  SArray*          pColIds = NULL;
2339 2340
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2341

H
Haojun Liao 已提交
2342
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2343
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2344
    tableListDestroy(pTableListInfo);
2345
    goto _error;
H
Haojun Liao 已提交
2346 2347
  }

2348
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2349
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2350

2351
  pInfo->pTagCond = pTagCond;
2352
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2353

2354
  int32_t numOfCols = 0;
2355 2356
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2357
  if (code != TSDB_CODE_SUCCESS) {
2358
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2359 2360
    goto _error;
  }
2361

H
Haojun Liao 已提交
2362
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2363
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2364
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2365
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2366 2367

    int16_t colId = id->colId;
2368
    taosArrayPush(pColIds, &colId);
2369
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2370
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2371
    }
H
Haojun Liao 已提交
2372 2373
  }

L
Liu Jicong 已提交
2374 2375 2376 2377
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2378
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2379 2380
      goto _error;
    }
2381

L
Liu Jicong 已提交
2382 2383 2384
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2385
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2386 2387 2388 2389
      goto _error;
    }
  }

2390 2391
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2392
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2393 2394
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2395
      tableListDestroy(pTableListInfo);
2396 2397 2398 2399
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2400
      tableListDestroy(pTableListInfo);
2401 2402 2403 2404
      goto _error;
    }
  }

L
Liu Jicong 已提交
2405
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2406
  if (pInfo->pBlockLists == NULL) {
2407
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2408
    tableListDestroy(pTableListInfo);
2409
    goto _error;
H
Haojun Liao 已提交
2410 2411
  }

5
54liuyao 已提交
2412
  if (pHandle->vnode) {
2413
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2414
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2415
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2416
      pTSInfo->base.cond.endVersion = pHandle->version;
2417
    }
L
Liu Jicong 已提交
2418

2419
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2420
    int32_t        num = 0;
2421
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2422

2423
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2424
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2425
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2426 2427
    }

L
Liu Jicong 已提交
2428 2429 2430 2431
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2432
    } else {
L
Liu Jicong 已提交
2433 2434
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2435 2436
    }

2437
    pInfo->pUpdateInfo = NULL;
2438
    pInfo->pTableScanOp = pTableScanOp;
2439 2440 2441
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2442

L
Liu Jicong 已提交
2443
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2444
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2445 2446
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2447

L
Liu Jicong 已提交
2448
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2449
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2450
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2451
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2452 2453 2454 2455
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2456

L
Liu Jicong 已提交
2457
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2458
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2459 2460
  } else {
    taosArrayDestroy(pColIds);
2461
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2462
    pColIds = NULL;
5
54liuyao 已提交
2463 2464
  }

2465 2466 2467 2468 2469
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2470 2471 2472 2473 2474
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2475
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2476
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2477
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2478
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2479
  pInfo->groupId = 0;
2480
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2481
  pInfo->pStreamScanOp = pOperator;
2482
  pInfo->deleteDataIndex = 0;
2483
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2484
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2485
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2486
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2487
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2488 2489
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2490
  pInfo->twAggSup.maxTs = INT64_MIN;
L
Liu Jicong 已提交
2491

L
Liu Jicong 已提交
2492 2493
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2494
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2495

2496
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2497 2498
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2499

H
Haojun Liao 已提交
2500
  return pOperator;
2501

L
Liu Jicong 已提交
2502
_error:
H
Haojun Liao 已提交
2503 2504 2505 2506 2507 2508 2509 2510
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2511 2512
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2513 2514
}

2515
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2516 2517 2518 2519
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2520 2521 2522
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2523
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2524
  SSDataBlock*  pRes = pInfo->pRes;
2525
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2526

2527
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2528
  if (size == 0) {
H
Haojun Liao 已提交
2529 2530 2531 2532
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2533 2534 2535
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2536
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2537

wmmhello's avatar
wmmhello 已提交
2538
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2539
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
L
Liu Jicong 已提交
2540
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2541
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2542
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2543 2544
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2545
      metaReaderClear(&mr);
2546
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2547
    }
H
Haojun Liao 已提交
2548

2549
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2550 2551 2552 2553 2554
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2555
        colDataSetVal(pDst, count, str, false);
2556
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2557 2558
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2559
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2560

2561 2562 2563 2564
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2565 2566
          data = (char*)p;
        }
2567
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2568
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2569

2570 2571
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2572
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2573
        }
H
Haojun Liao 已提交
2574 2575 2576
      }
    }

2577
    count += 1;
wmmhello's avatar
wmmhello 已提交
2578
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2579
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2580 2581 2582
    }
  }

2583 2584
  metaReaderClear(&mr);

2585
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2586
  if (pOperator->status == OP_EXEC_DONE) {
2587
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2588 2589 2590
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2591
  pOperator->resultInfo.totalRows += count;
2592

2593
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2594 2595
}

2596
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2597 2598
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2599
  taosArrayDestroy(pInfo->matchInfo.pList);
2600
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2601
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2602 2603
}

S
slzhou 已提交
2604
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2605
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2606
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2607 2608 2609 2610 2611
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2612 2613 2614 2615
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2616
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2617 2618 2619
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2620

H
Haojun Liao 已提交
2621 2622
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2623 2624 2625
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2626

2627
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2628
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2629 2630
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2631

L
Liu Jicong 已提交
2632 2633
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2634
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2635 2636
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2637 2638
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2639 2640

  return pOperator;
2641

2642
_error:
H
Haojun Liao 已提交
2643 2644 2645 2646 2647
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2648

dengyihao's avatar
dengyihao 已提交
2649
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2650 2651 2652 2653 2654 2655
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
2656
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2657

H
Haojun Liao 已提交
2658
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2659

L
Liu Jicong 已提交
2660
  int64_t      st = taosGetTimestampUs();
2661
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2662
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2663

D
dapan1121 已提交
2664
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2665
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
2666 2667 2668
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2669
  }
X
Xiaoyu Wang 已提交
2670

2671
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2672
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2673
  bool         hasNext = false;
2674
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2687

H
Haojun Liao 已提交
2688
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2689
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2690
      pInfo->base.dataReader = NULL;
2691
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2692 2693 2694
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2695
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2696 2697 2698 2699
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2700
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2701 2702 2703 2704
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2705 2706

    uint32_t status = 0;
2707
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2708
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2709
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2710
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2711 2712 2713 2714 2715 2716 2717
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2718
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2719

H
Haojun Liao 已提交
2720
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2721
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2722

2723
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2724 2725 2726 2727
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2728
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2729 2730
    return pBlock;
  }
H
Haojun Liao 已提交
2731

D
dapan1121 已提交
2732 2733 2734 2735
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2736
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2737 2738 2739
  return NULL;
}

2740 2741 2742
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2743
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2744
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2745
      tsTargetSlotId = colInfo->dstSlotId;
2746 2747 2748
    }
  }

2749 2750 2751
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2752
  bi.slotId = tsTargetSlotId;
2753 2754 2755 2756 2757 2758 2759
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2760
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2761 2762 2763 2764 2765 2766 2767
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2768

2769
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2770 2771 2772
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2773
  {
2774
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2775
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2776
    for (; i < numOfTables; ++i) {
2777
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2778 2779 2780 2781 2782 2783
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2784

S
slzhou 已提交
2785 2786
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2787

H
Haojun Liao 已提交
2788
  pInfo->base.dataReader = NULL;
2789

2790 2791
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2792
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2793
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2794 2795
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2796

dengyihao's avatar
dengyihao 已提交
2797
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2798 2799 2800 2801 2802 2803

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2804 2805 2806
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2807
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2808
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2809 2810
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2811
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2812 2813

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2814
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2815
    taosArrayPush(pInfo->queryConds, &cond);
2816 2817
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2818
  for (int32_t i = 0; i < numOfTable; ++i) {
2819
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2820
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2821
    ps->param = param;
2822
    ps->onlyRef = true;
2823 2824 2825 2826 2827 2828
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2829
    T_LONG_JMP(pTaskInfo->env, terrno);
2830 2831
  }

2832 2833 2834 2835 2836 2837 2838
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2839
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2840

2841 2842 2843 2844 2845 2846 2847
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2848
  for (int32_t i = 0; i < numOfTable; ++i) {
2849 2850
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2851 2852
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2853
  }
2854 2855
  taosArrayClear(pInfo->sortSourceParams);

2856
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2857
  pInfo->pSortHandle = NULL;
2858

dengyihao's avatar
opt mem  
dengyihao 已提交
2859 2860 2861
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2862
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2863 2864 2865
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2866
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2867 2868 2869
  return TSDB_CODE_SUCCESS;
}

2870 2871
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2872 2873
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2874 2875 2876
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2877
  blockDataCleanup(pResBlock);
2878 2879

  while (1) {
2880
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2881 2882 2883 2884
    if (pTupleHandle == NULL) {
      break;
    }

2885 2886
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2887 2888 2889 2890
      break;
    }
  }

2891
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2892
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2893
         pInfo->limitInfo.numOfOutputRows);
2894

2895
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2908
    T_LONG_JMP(pTaskInfo->env, code);
2909
  }
2910

2911
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2912 2913
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2914

S
slzhou 已提交
2915
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2916
      setOperatorCompleted(pOperator);
2917 2918
      return NULL;
    }
S
slzhou 已提交
2919
    pInfo->tableStartIndex = 0;
2920
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2921 2922
    startGroupTableMergeScan(pOperator);
  }
2923

S
slzhou 已提交
2924 2925
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2926 2927 2928 2929
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2930 2931
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2932
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2933
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2934 2935 2936
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2937
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2938 2939
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2940
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2941 2942
        break;
      }
2943

S
slzhou 已提交
2944
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2945
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2946
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2947
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2948
    }
wmmhello's avatar
wmmhello 已提交
2949 2950
  }

2951 2952 2953
  return pBlock;
}

2954
void destroyTableMergeScanOperatorInfo(void* param) {
2955
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2956
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2957

dengyihao's avatar
dengyihao 已提交
2958 2959 2960
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2961 2962
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2963 2964
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2965
  }
H
Haojun Liao 已提交
2966

D
dapan1121 已提交
2967 2968 2969
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;

2970
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2971 2972
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2973

dengyihao's avatar
opt mem  
dengyihao 已提交
2974 2975 2976
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2977 2978
  }

2979 2980
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
2981 2982 2983 2984 2985

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2986
  taosMemoryFreeClear(param);
2987 2988 2989 2990
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2991 2992
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2993
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2994
  execInfo->blockRecorder = pInfo->base.readRecorder;
2995
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2996 2997 2998

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2999

3000 3001 3002
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3003
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3004
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3005 3006 3007 3008 3009
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3010

3011 3012 3013
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3014
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3015
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3016 3017 3018
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3019

H
Haojun Liao 已提交
3020
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3021
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3022
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3023 3024 3025 3026
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3027
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3028 3029
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3030 3031 3032 3033
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3034 3035 3036 3037 3038 3039
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3040 3041
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3042
  pInfo->base.readHandle = *readHandle;
3043 3044 3045

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3046
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3047

3048
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3049
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3050 3051 3052 3053 3054 3055

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3056
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3057
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3058 3059
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3060
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3061

H
Haojun Liao 已提交
3062
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3063
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3064
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3065

dengyihao's avatar
dengyihao 已提交
3066
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3067 3068
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3069

L
Liu Jicong 已提交
3070 3071
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3072
  pOperator->exprSupp.numOfExprs = numOfCols;
3073

3074 3075
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3076 3077 3078 3079 3080 3081 3082 3083 3084
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3085 3086 3087 3088

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3089
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3090 3091 3092 3093 3094 3095
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3096 3097
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3098 3099 3100 3101 3102 3103 3104
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3166 3167
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3196

S
slzhou 已提交
3197
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3198 3199 3200
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3201
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3202
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3203
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3204 3205 3206 3207 3208 3209 3210 3211 3212

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3213
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3214 3215
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3216 3217 3218
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3219 3220 3221

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3222 3223
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3235 3236 3237
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3238
    ASSERT(strlen(dbName));
S
slzhou 已提交
3239
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3240 3241 3242 3243

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3244
    varDataSetLen(varDbName, strlen(dbName));
3245
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3246 3247 3248 3249
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3250
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3251
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3252
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3253
      varDataSetLen(varStbName, strlen(stbName));
3254
      colDataSetVal(colInfoData, 0, varStbName, false);
3255
    } else {
3256
      colDataSetNULL(colInfoData, 0);
3257
    }
S
slzhou 已提交
3258 3259 3260
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3261
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3262
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3263 3264 3265 3266
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3267
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3268 3269 3270
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3271
  size_t infodbTableNum;
S
slzhou 已提交
3272
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3273
  size_t perfdbTableNum;
S
slzhou 已提交
3274 3275
  getPerfDbMeta(NULL, &perfdbTableNum);

3276
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3277
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3278 3279
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3280
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3281 3282 3283 3284
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
3301 3302 3303 3304 3305 3306
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3307

S
slzhou 已提交
3308 3309 3310
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
3311 3312 3313 3314 3315 3316 3317
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3318 3319 3320 3321 3322 3323 3324 3325
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3326
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3327 3328 3329 3330
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3331
  blockDataCleanup(pRes);
3332

S
slzhou 已提交
3333 3334 3335
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3336
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3337
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3338
  }
S
slzhou 已提交
3339

S
slzhou 已提交
3340 3341 3342 3343 3344
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3345 3346
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3347
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3348

S
slzhou 已提交
3349 3350 3351 3352 3353 3354
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

3355
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3370
      }
S
slzhou 已提交
3371 3372 3373 3374 3375 3376 3377 3378 3379 3380
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3381
    } else {
S
slzhou 已提交
3382
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3383 3384
    }
  } else {
S
slzhou 已提交
3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3402 3403 3404
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3405
    }
S
slzhou 已提交
3406 3407 3408
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3409
  }
S
slzhou 已提交
3410 3411 3412 3413 3414 3415
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3416 3417 3418
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3419

S
slzhou 已提交
3420 3421 3422
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3423 3424 3425
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3426 3427 3428 3429 3430 3431 3432 3433
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3434 3435 3436 3437 3438
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
X
Xiaoyu Wang 已提交
3439

S
slzhou 已提交
3440 3441 3442 3443 3444 3445 3446 3447
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3448 3449 3450
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3451
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3452 3453
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3454
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3455 3456
  taosMemoryFreeClear(param);
}