scanoperator.c 130.2 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
36
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
37
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
38
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
39

H
Haojun Liao 已提交
40 41 42 43 44 45 46 47 48
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
X
Xiaoyu Wang 已提交
49
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
50
  bool           multiReader;
51
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
52 53
} STableMergeScanSortSourceParam;

54 55 56 57 58 59 60 61 62 63
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
64
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
65

H
Haojun Liao 已提交
66
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
67 68 69 70 71 72 73 74 75 76 77 78
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

79
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
80 81 82 83 84
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

85 86 87 88 89 90 91 92 93
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
94
  // convert key to second
95 96 97 98 99 100 101
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
102
  time_t    t = (time_t)key;
103
  taosLocalTime(&t, &tm, NULL);
104 105 106 107

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
108
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
109 110 111 112

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
113
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
114 115 116 117

  tw->ekey -= 1;
}

118
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
119 120 121 122 123 124 125
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

126
  if (order == TSDB_ORDER_ASC) {
127
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
128
    ASSERT(w.ekey >= pBlockInfo->window.skey);
129

130
    if (w.ekey < pBlockInfo->window.ekey) {
131 132 133
      return true;
    }

134 135
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
136 137 138 139
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

140
      ASSERT(w.ekey > pBlockInfo->window.ekey);
141
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
142 143 144 145
        return true;
      }
    }
  } else {
146
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
147
    ASSERT(w.skey <= pBlockInfo->window.ekey);
148

149
    if (w.skey > pBlockInfo->window.skey) {
150 151 152
      return true;
    }

153
    while (1) {
154 155 156 157 158
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

H
Haojun Liao 已提交
159
      ASSERT(w.skey < pBlockInfo->window.skey);
160
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
161 162 163
        return true;
      }
    }
164 165 166 167 168
  }

  return false;
}

169 170 171 172 173 174 175 176 177 178 179
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
180 181
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
182 183 184 185 186

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
187
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
188 189 190
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
191

192 193 194 195 196 197
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
198
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
199 200 201
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
202
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
203 204

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
205
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
206 207 208 209 210 211 212 213 214

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
215
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
216 217 218 219 220 221 222 223 224

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
225
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
226 227 228 229 230 231 232 233

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
234
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
235
                               int32_t numOfRows) {
H
Haojun Liao 已提交
236
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
237 238 239
    return true;
  }

H
Haojun Liao 已提交
240
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
241 242 243
  return keep;
}

H
Haojun Liao 已提交
244
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
245
  bool    allColumnsHaveAgg = true;
246
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
247
  if (code != TSDB_CODE_SUCCESS) {
248
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
249 250 251 252 253 254 255 256
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
257
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
258
                               int32_t rows) {
H
Haojun Liao 已提交
259 260 261
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

262
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
263
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
264
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
265
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
266 267
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
268 269 270

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
271 272 273
  }
}

274
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
275
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
276
  const char* id = GET_TASKID(pTaskInfo);
277

278
  if (pLimitInfo->remainOffset > 0) {
279 280
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
281
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
282
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
283
      return false;
284
    } else {
285
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
286 287 288 289 290 291
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
292
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
293
    blockDataKeepFirstNRows(pBlock, keep);
294 295

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
296
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
297
    return true;
298
  }
299

300
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
301
  return false;
302 303
}

H
Haojun Liao 已提交
304
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
305
                             uint32_t* status) {
S
slzhou 已提交
306
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
307
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
308 309

  pCost->totalBlocks += 1;
310
  pCost->totalRows += pBlock->info.rows;
311

H
Haojun Liao 已提交
312
  bool loadSMA = false;
H
Haojun Liao 已提交
313
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
314
  if (pOperator->exprSupp.pFilterInfo != NULL ||
315
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
316 317 318 319
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
320
  taosMemoryFreeClear(pBlock->pBlockAgg);
321 322

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
323
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
324
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
325
    pCost->filterOutBlocks += 1;
326
    pCost->totalRows += pBlock->info.rows;
327
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
328 329
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
330 331 332
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
333
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
334
    pCost->skipBlocks += 1;
335
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
336
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
337
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
338
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
339
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
340
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
341
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
342
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
343
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
344
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
345
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
346 347
      return TSDB_CODE_SUCCESS;
    } else {
348
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
349
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
350
    }
H
Haojun Liao 已提交
351
  }
352

H
Haojun Liao 已提交
353
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
354

H
Haojun Liao 已提交
355
  // try to filter data block according to sma info
H
Haojun Liao 已提交
356
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
357 358 359
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
360
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
361
      if (!keep) {
X
Xiaoyu Wang 已提交
362 363
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
364 365 366
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

367
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
368 369
        return TSDB_CODE_SUCCESS;
      }
370
    }
H
Haojun Liao 已提交
371
  }
372

373 374 375
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

376
  // try to filter data block according to current results
377 378
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
379 380
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
381
    pCost->skipBlocks += 1;
382
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
383
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
384 385 386
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
387 388
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
389

H
Haojun Liao 已提交
390 391
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
392
    return terrno;
H
Haojun Liao 已提交
393 394
  }

H
Haojun Liao 已提交
395
  ASSERT(p == pBlock);
396
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
397

H
Haojun Liao 已提交
398 399
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
400

H
Haojun Liao 已提交
401
  if (pOperator->exprSupp.pFilterInfo != NULL) {
402
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
403
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
404

405 406
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
407

408 409
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
410
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
411 412 413 414
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
415 416
  }

417
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
418
  if (limitReached) {  // set operator flag is done
419 420
    setOperatorCompleted(pOperator);
  }
421

H
Haojun Liao 已提交
422
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
423 424 425
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
426
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
427 428 429
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
430
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
431 432
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
433 434
}

435 436
typedef struct STableCachedVal {
  const char* pName;
437
  STag*       pTags;
438 439
} STableCachedVal;

440 441 442 443 444 445 446 447 448 449 450
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
451 452
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
453
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
454 455 456 457
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
458
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
459 460 461 462 463 464 465
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

466 467
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
468

469 470 471 472 473
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
474
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
475 476 477
  }
}

478 479
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
480
  // currently only the tbname pseudo column
481
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
482
    return TSDB_CODE_SUCCESS;
483 484
  }

485 486
  int32_t code = 0;

487 488 489 490
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

491
  bool            freeReader = false;
492
  STableCachedVal val = {0};
493 494

  SMetaReader mr = {0};
495
  LRUHandle*  h = NULL;
496

497 498 499
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

500
  // 1. check if it is existed in meta cache
501
  if (pCache == NULL) {
502
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
503
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
504
    if (code != TSDB_CODE_SUCCESS) {
505
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
506
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
507 508
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
509 510 511

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
512
      } else {
S
slzhou 已提交
513 514
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
515
      }
516 517 518 519 520
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
521

522 523
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
524 525

    freeReader = true;
526
  } else {
527 528
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
529
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
530 531
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
532
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
533
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
534
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
535
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
536
                pBlock->info.id.uid, tstrerror(terrno), idStr);
537 538
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
539
        } else {
H
Haojun Liao 已提交
540
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
541
                 idStr);
H
Haojun Liao 已提交
542
        }
543 544 545 546 547 548
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
549
      STableCachedVal* pVal = createTableCacheVal(&mr);
550

H
Haojun Liao 已提交
551
      val = *pVal;
552
      freeReader = true;
H
Haojun Liao 已提交
553

H
Haojun Liao 已提交
554
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
555
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
556 557 558 559 560 561 562 563
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
564

H
Haojun Liao 已提交
565
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
566
    }
H
Haojun Liao 已提交
567

568 569
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
570
  }
571

572 573
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
574
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
575 576

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
577
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
578

579
    int32_t functionId = pExpr1->pExpr->_function.functionId;
580 581 582

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
583
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
584
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
585
      STagVal tagVal = {0};
586 587
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
588

589 590 591 592
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
593
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
594
      }
595

H
Haojun Liao 已提交
596 597
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
598
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
599
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
600
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
601 602 603
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
604 605 606 607 608 609
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
610
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
611
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
612
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
613
        }
614 615 616 617
      }
    }
  }

618 619
  // restore the rows
  pBlock->info.rows = backupRows;
620 621 622 623
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
624
  return TSDB_CODE_SUCCESS;
625 626
}

H
Haojun Liao 已提交
627
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
628 629 630
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
631
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
632
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
633 634 635
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
636

H
Haojun Liao 已提交
637
  colInfoDataEnsureCapacity(&infoData, 1, false);
638
  colDataSetVal(&infoData, 0, buf, false);
639

H
Haojun Liao 已提交
640
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
641
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
642 643 644 645 646 647 648

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
649
  colDataDestroy(&infoData);
650 651
}

652
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
653
  STableScanInfo* pTableScanInfo = pOperator->info;
654
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
655
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
656 657
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
658

659 660
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
661 662 663 664 665 666 667 668 669 670
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
671

672
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
673
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
674
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
675
    }
H
Haojun Liao 已提交
676

677
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
678
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
679 680 681
      break;
    }

682 683 684 685 686 687
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
688
    if (pBlock->info.id.uid) {
689
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
690
    }
691

692
    uint32_t status = 0;
693
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
694
    if (code != TSDB_CODE_SUCCESS) {
695
      T_LONG_JMP(pTaskInfo->env, code);
696
    }
697

698 699 700
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
701
    }
702

H
Haojun Liao 已提交
703 704
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
705

H
Haojun Liao 已提交
706
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
707 708

    // todo refactor
H
Haojun Liao 已提交
709
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
710
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
X
Xiaoyu Wang 已提交
711 712 713
    //    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    //    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
    //    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
714

715
    return pBlock;
H
Haojun Liao 已提交
716 717 718 719
  }
  return NULL;
}

H
Haojun Liao 已提交
720
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
721 722 723 724
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
725
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
726 727 728
    return NULL;
  }

729 730
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
731 732 733
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
734 735
    }

736
    pTableScanInfo->scanTimes += 1;
737

738
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
739
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
740 741
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
742
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
743

744
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
745
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
746
    }
747
  }
H
Haojun Liao 已提交
748

749
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
750
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
751 752 753
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
754
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
755
    }
H
Haojun Liao 已提交
756

757
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
758 759 760
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
761
      }
H
Haojun Liao 已提交
762

763
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
764

765
      if (pTableScanInfo->scanTimes < total) {
766
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
767
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
768

769
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
770
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
771
      }
H
Haojun Liao 已提交
772 773 774
    }
  }

wmmhello's avatar
wmmhello 已提交
775 776 777 778 779 780 781
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

782
  // scan table one by one sequentially
L
Liu Jicong 已提交
783
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
784
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
785
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
786

L
Liu Jicong 已提交
787
    while (1) {
H
Haojun Liao 已提交
788
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
789
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
790 791
        return result;
      }
H
Haojun Liao 已提交
792

L
Liu Jicong 已提交
793 794
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
795 796

      taosRLockLatch(&pTaskInfo->lock);
797
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
798

H
Haojun Liao 已提交
799
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
800
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
801
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
802 803
        return NULL;
      }
H
Haojun Liao 已提交
804

X
Xiaoyu Wang 已提交
805
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
806 807 808 809
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
810
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
811

H
Haojun Liao 已提交
812
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
813 814
      pInfo->scanTimes = 0;
    }
815 816
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
817
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
818
        setOperatorCompleted(pOperator);
819 820
        return NULL;
      }
821

5
54liuyao 已提交
822
      int32_t        num = 0;
823
      STableKeyInfo* pList = NULL;
824
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
825
      ASSERT(pInfo->base.dataReader == NULL);
826

L
Liu Jicong 已提交
827
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
828
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
829 830 831
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
832 833 834 835

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
836
    }
H
Haojun Liao 已提交
837

H
Haojun Liao 已提交
838
    SSDataBlock* result = doGroupedTableScan(pOperator);
839 840 841
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
842

843
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
844
      setOperatorCompleted(pOperator);
845 846
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
847

848 849
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
850
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
851

5
54liuyao 已提交
852
    int32_t        num = 0;
853
    STableKeyInfo* pList = NULL;
854
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
855

H
Haojun Liao 已提交
856 857
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
858
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
859

H
Haojun Liao 已提交
860
    result = doGroupedTableScan(pOperator);
861 862 863
    if (result != NULL) {
      return result;
    }
864

H
Haojun Liao 已提交
865
    setOperatorCompleted(pOperator);
866 867
    return NULL;
  }
H
Haojun Liao 已提交
868 869
}

870 871
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
872
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
873
  *pRecorder = pTableScanInfo->base.readRecorder;
874 875 876 877 878
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

879 880
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
881

882 883
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
884

885 886
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
887
  }
L
Liu Jicong 已提交
888

889
  tableListDestroy(pBase->pTableListInfo);
890 891 892 893 894 895 896 897
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
898
  taosMemoryFreeClear(param);
899 900
}

901
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
902
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
903
  int32_t         code = 0;
H
Haojun Liao 已提交
904 905 906
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
907
    code = TSDB_CODE_OUT_OF_MEMORY;
908
    goto _error;
H
Haojun Liao 已提交
909 910
  }

911
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
912
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
913 914

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
915 916
  code =
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
917 918 919 920
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
921
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
922
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
923
  if (code != TSDB_CODE_SUCCESS) {
924
    goto _error;
925 926
  }

H
Haojun Liao 已提交
927
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
928
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
929
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
930
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
931 932
  }

933
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
934
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
935

H
Haojun Liao 已提交
936 937
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
938 939
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

940 941
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
942

H
Haojun Liao 已提交
943
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
944
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
945
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
946

H
Haojun Liao 已提交
947 948 949
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
950 951
  }

wmmhello's avatar
wmmhello 已提交
952
  pInfo->currentGroupId = -1;
953
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
954
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
955

L
Liu Jicong 已提交
956 957
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
958
  pOperator->exprSupp.numOfExprs = numOfCols;
959

960
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
961 962
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
963 964 965
    code = terrno;
    goto _error;
  }
966

D
dapan1121 已提交
967 968 969 970
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
971
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
972 973
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
974 975 976

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
977
  return pOperator;
978

979
_error:
980 981 982
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
983

984 985
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
986
  return NULL;
H
Haojun Liao 已提交
987 988
}

989
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
990
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
991
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
992

H
Haojun Liao 已提交
993
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
994
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
995

L
Liu Jicong 已提交
996 997
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
998
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
999 1000 1001
  return pOperator;
}

1002
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1003
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1004 1005
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1006 1007
}

1008
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1009
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1010 1011
}

1012
static bool isStateWindow(SStreamScanInfo* pInfo) {
1013
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1014
}
5
54liuyao 已提交
1015

L
Liu Jicong 已提交
1016
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1017 1018 1019
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1020 1021 1022
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1023
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1024 1025
}

1026 1027 1028 1029
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1030
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1031 1032
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1033
  ASSERT(rowIndex < pBlock->info.rows);
1034
  pInfo->groupId = groupCol[rowIndex];
1035 1036
}

L
Liu Jicong 已提交
1037
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
1038
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
1039 1040
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1041
  tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1042
  qDebug("1");
H
Haojun Liao 已提交
1043
  pTableScanInfo->base.dataReader = NULL;
1044 1045
}

L
Liu Jicong 已提交
1046 1047
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1048
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1049

1050
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1051
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1052 1053 1054 1055 1056 1057 1058 1059 1060

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1061
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1062
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1063 1064
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1065
    T_LONG_JMP(pTaskInfo->env, code);
1066 1067 1068
    return NULL;
  }

D
dapan1121 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    T_LONG_JMP(pTaskInfo->env, code);
    return NULL;
  }

  if (hasNext) {
L
Liu Jicong 已提交
1078
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1079
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1080
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1081 1082 1083
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1084
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1085 1086
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1087 1088

  return pBlock->info.rows > 0 ? pBlock : NULL;
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1100
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1101
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1102
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1103 1104
}

5
54liuyao 已提交
1105 1106 1107 1108 1109 1110 1111 1112
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1113
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1114 1115 1116
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1127 1128 1129
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1130 1131 1132 1133 1134 1135

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1136
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1137 1138 1139 1140
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1141 1142 1143
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1144
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1145 1146 1147
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1148
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1149 1150 1151
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1152 1153
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1154 1155 1156 1157
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1158
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1159 1160 1161
  return true;
}

5
54liuyao 已提交
1162
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1163
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1164
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1165
  dumyInfo.cur.pageId = -1;
1166
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1167 1168
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1169
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1170

5
54liuyao 已提交
1171
  while (1) {
1172 1173 1174
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1175
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1176 1177 1178 1179 1180
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1181
    }
5
54liuyao 已提交
1182

5
54liuyao 已提交
1183 1184 1185
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1186
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1187
    endWin = preWin;
5
54liuyao 已提交
1188
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1189 1190 1191 1192 1193 1194
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1195

L
Liu Jicong 已提交
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1207
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1208
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1209
      tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1210
      qDebug("2");
H
Haojun Liao 已提交
1211
      pTableScanInfo->base.dataReader = NULL;
1212 1213
      return NULL;
    }
L
Liu Jicong 已提交
1214

H
Haojun Liao 已提交
1215
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1216 1217 1218 1219
    if (pResult->info.rows == 0) {
      continue;
    }

1220 1221 1222 1223 1224 1225 1226 1227
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1228 1229
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1230
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1231 1232 1233 1234
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1235 1236 1237

      blockDataDestroy(tmpBlock);

1238 1239 1240 1241
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1242
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1243
      pResult->info.calWin = pInfo->updateWin;
1244
      return pResult;
5
54liuyao 已提交
1245 1246
    }
  }
1247
}
1248

1249
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1250
                                   SSessionKey* pKey) {
1251 1252 1253
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1254

1255 1256 1257 1258 1259
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
H
Haojun Liao 已提交
1260 1261

  taosMemoryFree(pCur);
1262 1263 1264
  return code;
}

1265
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1266
  blockDataCleanup(pDestBlock);
1267 1268
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1269
  }
1270
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1271
  if (code != TSDB_CODE_SUCCESS) {
1272
    return code;
L
Liu Jicong 已提交
1273
  }
1274 1275
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1276
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1277
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1278
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1279 1280
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1281

1282 1283
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1284
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1285
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1286 1287
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1288
  int64_t          version = pSrcBlock->info.version - 1;
1289
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1290
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1291
    // gap must be 0.
5
54liuyao 已提交
1292
    SSessionKey startWin = {0};
1293
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1294
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1295 1296 1297
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1298 1299
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1300
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1301 1302 1303 1304
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1305
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1306 1307
      continue;
    }
1308 1309
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1310

1311
    colDataSetNULL(pDestUidCol, i);
1312
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1313 1314
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1315
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1316
  }
1317
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1318
}
1319 1320 1321 1322 1323 1324

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1325
  }
1326

1327 1328
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1329 1330
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1331

L
Liu Jicong 已提交
1332
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1333
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1370 1371
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1372
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1373 1374 1375
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1376
  for (int32_t i = 0; i < rows;) {
1377
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1378 1379 1380 1381 1382
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1383
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1384
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1385 1386
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1387 1388 1389 1390 1391
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1392
    pDestBlock->info.rows++;
5
54liuyao 已提交
1393
  }
1394 1395
  return TSDB_CODE_SUCCESS;
}
1396

1397
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1398 1399 1400
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1401 1402
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1403
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1404 1405 1406 1407
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1418
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1419 1420
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1421
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1422 1423 1424
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1425
    if (pInfo->tbnameCalSup.pExprInfo) {
1426 1427 1428
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1429 1430
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1431
      tdbFree(parTbname);
L
Liu Jicong 已提交
1432 1433 1434
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1435 1436 1437 1438
  }
  return TSDB_CODE_SUCCESS;
}

1439 1440 1441 1442
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1443
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1444
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1445 1446
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1447
  }
1448
  pDestBlock->info.type = STREAM_CLEAR;
1449
  pDestBlock->info.version = pSrcBlock->info.version;
1450
  pDestBlock->info.dataLoad = 1;
1451 1452 1453 1454
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1455 1456 1457
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SExprSupp*    pTagCalSup = &pInfo->tagCalSup;
1458
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1459
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
L
Liu Jicong 已提交
1460
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1461

L
Liu Jicong 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
  void*   tag = NULL;
  int32_t tagLen = 0;
  if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
    pBlock->info.tagLen = tagLen;
    void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
    if (pTag == NULL) {
      tdbFree(tag);
      taosMemoryFree(pBlock->info.pTag);
      pBlock->info.pTag = NULL;
      pBlock->info.tagLen = 0;
      return;
    }
    pBlock->info.pTag = pTag;
    memcpy(pBlock->info.pTag, tag, tagLen);
    tdbFree(tag);
    return;
L
Liu Jicong 已提交
1478
  } else {
L
Liu Jicong 已提交
1479
    pBlock->info.pTag = NULL;
L
Liu Jicong 已提交
1480
  }
L
Liu Jicong 已提交
1481 1482 1483
  tdbFree(tag);
}
#endif
L
Liu Jicong 已提交
1484

5
54liuyao 已提交
1485
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1486 1487
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
5
54liuyao 已提交
1488 1489
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1490
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1491
  } else {
5
54liuyao 已提交
1492 1493
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1494
  }
L
Liu Jicong 已提交
1495 1496
}

1497 1498
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1499 1500
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1501 1502
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1503 1504
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1505
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1506 1507 1508 1509 1510 1511 1512
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1513
  pBlock->info.rows++;
5
54liuyao 已提交
1514 1515
}

1516
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1517 1518
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1519
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1520
  }
1521 1522
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1523
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1524
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1525
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1526 1527
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1528
    bool        isClosed = false;
5
54liuyao 已提交
1529
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1530
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1531 1532 1533 1534 1535
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1536 1537 1538
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1539
    // must check update info first.
H
Haojun Liao 已提交
1540
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1541
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1542
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1543
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1544
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1545
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1546
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1547
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1548
                                       NULL);
5
54liuyao 已提交
1549 1550
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1551 1552
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1553
      }
1554 1555
    }
  }
1556 1557
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1558
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1559
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1560
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1561 1562
  }
}
L
Liu Jicong 已提交
1563

1564
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1565 1566
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1567
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1568

1569 1570
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1571
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1572
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1573
  pInfo->pRes->info.type = STREAM_NORMAL;
1574
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1575

1576
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1577
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1578 1579

  // todo extract method
H
Haojun Liao 已提交
1580 1581 1582
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1583 1584 1585 1586 1587 1588 1589
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1590
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1591
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1592 1593 1594 1595 1596 1597 1598
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1599
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1600
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1601 1602 1603 1604 1605
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1606
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1607
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1608 1609
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1610
      blockDataFreeRes((SSDataBlock*)pBlock);
1611
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1612
    }
K
kailixu 已提交
1613 1614 1615

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1616 1617
  }

1618
  if (filter) {
H
Haojun Liao 已提交
1619
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1620
  }
1621

1622
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1623
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1624
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1625

L
Liu Jicong 已提交
1626
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1627 1628
  return 0;
}
5
54liuyao 已提交
1629

L
Liu Jicong 已提交
1630
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1631 1632
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1633
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1634

1635
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1636

L
Liu Jicong 已提交
1637
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
1638
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1639
      SPackedData submit = pTaskInfo->streamInfo.submit;
1640
      if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
L
Liu Jicong 已提交
1641
        qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
1642
        return NULL;
L
Liu Jicong 已提交
1643 1644 1645 1646 1647 1648
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1649
    while (tqNextBlockImpl(pInfo->tqReader)) {
L
Liu Jicong 已提交
1650 1651
      SSDataBlock block = {0};

1652
      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1653 1654 1655 1656
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1657
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1658 1659 1660 1661 1662 1663

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1664 1665
    pInfo->tqReader->msg2 = (SPackedData){0};
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1666
    return NULL;
L
Liu Jicong 已提交
1667 1668
  }

1669
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1670 1671
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
X
Xiaoyu Wang 已提交
1672 1673 1674
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
             pInfo->tqReader->pWalReader->curVersion);
1675
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1676
      return pResult;
1677
    }
1678 1679 1680 1681 1682 1683
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
    if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
      return NULL;
1684
    }
wmmhello's avatar
wmmhello 已提交
1685
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1686 1687
  }

1688
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
1689
    while (1) {
1690 1691 1692 1693 1694 1695 1696 1697
      SSDataBlock block = {0};
      int32_t type = tqNextBlock(pInfo->tqReader, &block);

      // curVersion move to next, so currentOffset = curVersion - 1
      tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);

      if (type == FETCH_TYPE__DATA) {
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, block.info.rows,
X
Xiaoyu Wang 已提交
1698
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1699
        blockDataCleanup(pInfo->pRes);
1700
        setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1701
        if (pInfo->pRes->info.rows > 0) {
X
Xiaoyu Wang 已提交
1702 1703
          qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
                 pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1704 1705
          return pInfo->pRes;
        }
1706
      } else if (type == FETCH_TYPE__NONE) {
wmmhello's avatar
wmmhello 已提交
1707
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1708 1709 1710
        return NULL;
      }
    }
L
Liu Jicong 已提交
1711
  } else {
1712
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1713
    return NULL;
H
Haojun Liao 已提交
1714
  }
L
Liu Jicong 已提交
1715 1716
}

L
Liu Jicong 已提交
1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1735 1736 1737
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1738

1739 1740 1741
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1742 1743 1744
      j++;
    }
  }
L
Liu Jicong 已提交
1745
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1746 1747
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1748
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1749 1750 1751 1752

  return 0;
}

5
54liuyao 已提交
1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1765
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1766 1767 1768 1769
    }
  }
}

5
54liuyao 已提交
1770
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1771
  if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1772
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1784 1785 1786 1787
    }
  }
}

L
Liu Jicong 已提交
1788 1789 1790 1791 1792
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1793
  qDebug("stream scan called");
H
Haojun Liao 已提交
1794

1795 1796
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1797
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1798
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1799
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1800 1801 1802 1803
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1804
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1805
    } else {
H
Haojun Liao 已提交
1806 1807 1808 1809
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1810
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1811
    }
L
Liu Jicong 已提交
1812 1813

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1814
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1815

H
Haojun Liao 已提交
1816
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1817
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1818

L
Liu Jicong 已提交
1819 1820
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1821
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1822 1823
  }

5
54liuyao 已提交
1824 1825
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1826 1827 1828 1829 1830
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1831 1832 1833 1834 1835 1836 1837

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1838 1839 1840 1841
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1842
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1843 1844
        return pInfo->pUpdateRes;
      } break;
1845 1846 1847 1848 1849 1850 1851 1852 1853
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1854 1855 1856 1857 1858 1859 1860 1861
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1862
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1863 1864 1865 1866 1867 1868
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1869 1870 1871 1872 1873 1874
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1875
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1876
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1877
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1878 1879 1880 1881 1882 1883
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1884
      }
5
54liuyao 已提交
1885 1886
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1887
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1888 1889
        return pInfo->pCreateTbRes;
      }
X
Xiaoyu Wang 已提交
1890
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1891 1892
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1893 1894
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1895
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1896
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1897

H
Haojun Liao 已提交
1898
    pTSInfo->base.dataReader = NULL;
1899

H
Haojun Liao 已提交
1900 1901
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1902

L
Liu Jicong 已提交
1903
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1904 1905 1906
    return NULL;
  }

5
54liuyao 已提交
1907
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1908
// TODO: refactor
L
Liu Jicong 已提交
1909
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1910
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1911
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1912
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1913
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1914 1915 1916
      return NULL;
    }

1917
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1918 1919
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1920
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1921
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1922
    }
1923

1924
    // TODO move into scan
5
54liuyao 已提交
1925 1926
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1927
    pBlock->info.dataLoad = 1;
1928
    blockDataUpdateTsWindow(pBlock, 0);
1929
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1930 1931 1932
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1933 1934 1935
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1936 1937
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1938 1939 1940
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1941
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1942
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1943
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1944
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1945
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1946 1947
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1948
        }
5
54liuyao 已提交
1949 1950
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1951 1952 1953 1954 1955 1956
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1957
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1958
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1959
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1960
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1961 1962
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1963 1964 1965 1966 1967
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1968 1969 1970
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1971
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1972 1973 1974
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1975 1976 1977 1978
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1979
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1980
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1981 1982 1983 1984
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1985
        }
1986 1987 1988
      } break;
      default:
        break;
5
54liuyao 已提交
1989
    }
1990
    // printDataBlock(pBlock, "stream scan recv");
1991
    return pBlock;
L
Liu Jicong 已提交
1992
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1993
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1994 1995 1996
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1997
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1998 1999 2000
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2001 2002 2003
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2004
      } break;
2005
      case STREAM_SCAN_FROM_DELETE_DATA: {
2006 2007 2008 2009 2010 2011 2012
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2023
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
2024 2025
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
2026 2027
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
2028
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2029
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2030 2031
          return pSDB;
        }
2032
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2033 2034 2035 2036
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2037
    }
2038

2039
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2040
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2041 2042
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2043 2044
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2045
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2046
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2047
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2048
    }
5
54liuyao 已提交
2049

H
Haojun Liao 已提交
2050 2051
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2052
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2053

L
Liu Jicong 已提交
2054
  NEXT_SUBMIT_BLK:
2055
    while (1) {
L
Liu Jicong 已提交
2056
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2057
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2058
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2059
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2060
          qDebug("stream scan return empty, consume block %d", totBlockNum);
2061 2062
          return NULL;
        }
2063

L
Liu Jicong 已提交
2064 2065
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
2066
        if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2067 2068 2069 2070
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2071 2072
      }

2073 2074
      blockDataCleanup(pInfo->pRes);

2075
      while (tqNextBlockImpl(pInfo->tqReader)) {
2076
        SSDataBlock block = {0};
2077

2078
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
2079 2080 2081 2082
        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2083
        setBlockIntoRes(pInfo, &block, false);
2084

H
Haojun Liao 已提交
2085
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
2086
                             pInfo->pRes->info.version)) {
2087 2088 2089 2090 2091
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

5
54liuyao 已提交
2092 2093 2094
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2095 2096
        }

5
54liuyao 已提交
2097
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2098
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2099
        pInfo->pRes->info.dataLoad = 1;
2100 2101 2102
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2103 2104 2105
          break;
        }
      }
2106
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2107
        break;
J
jiacy-jcy 已提交
2108
      } else {
2109
        continue;
5
54liuyao 已提交
2110
      }
H
Haojun Liao 已提交
2111 2112 2113 2114
    }

    // record the scan action.
    pInfo->numOfExec++;
2115
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2116
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2117

X
Xiaoyu Wang 已提交
2118
    qDebug("scan rows: %" PRId64, pBlockInfo->rows);
L
Liu Jicong 已提交
2119 2120 2121
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2122 2123 2124 2125 2126 2127

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2128 2129 2130
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2131 2132 2133
  }
}

H
Haojun Liao 已提交
2134
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2135 2136 2137
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2138 2139 2140
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2141 2142 2143 2144 2145 2146
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2147
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2148 2149
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2150
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2151
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2152
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2153
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2154

wmmhello's avatar
wmmhello 已提交
2155
  qDebug("tmqsnap doRawScan called");
2156
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2157 2158 2159 2160 2161
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
2162
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2163 2164
      }
    }
X
Xiaoyu Wang 已提交
2165

D
dapan1121 已提交
2166
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2167
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2168
        tsdbReleaseDataBlock(pInfo->dataReader);
2169
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2170
      }
2171

H
Haojun Liao 已提交
2172 2173
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
2174
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2175 2176
      }

H
Haojun Liao 已提交
2177
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2178
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2179 2180
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2181 2182

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2183
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2184
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2185
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2186
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2187
    } else {
2188
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2189
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2190
    }
2191
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2192
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2193
    return NULL;
2194
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2195 2196 2197 2198 2199 2200
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2201
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2202
      taosMemoryFreeClear(data);
2203 2204 2205
      return NULL;
    }

2206
    if (!sContext->queryMeta) {  // change to get data next poll request
2207 2208 2209
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2210
    } else {
2211
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2212 2213 2214 2215
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2216

wmmhello's avatar
wmmhello 已提交
2217
    return NULL;
2218
  }
L
Liu Jicong 已提交
2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2257 2258 2259
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2260
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2261 2262 2263
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
2264
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2265 2266 2267
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2268 2269 2270
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2271
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2272 2273 2274 2275 2276
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2277 2278
  int32_t code = TSDB_CODE_SUCCESS;

2279
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2280
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2281
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2282 2283
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2284 2285
  }

2286
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2287 2288
  pInfo->vnode = pHandle->vnode;

2289
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2290 2291
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2292

2293
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2294
  return pOperator;
H
Haojun Liao 已提交
2295

L
Liu Jicong 已提交
2296
_end:
H
Haojun Liao 已提交
2297 2298 2299 2300
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2301 2302
}

2303
static void destroyStreamScanOperatorInfo(void* param) {
2304
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2305

2306
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2307
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2308
  }
2309

2310 2311 2312
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2313 2314
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2315
  }
C
Cary Xu 已提交
2316 2317
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2318
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2319
  }
C
Cary Xu 已提交
2320

L
Liu Jicong 已提交
2321
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2322
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2323

L
Liu Jicong 已提交
2324
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2325 2326 2327 2328
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2329
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2330
  blockDataDestroy(pStreamScan->pCreateTbRes);
2331 2332 2333 2334
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2335
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2336
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2337
  SArray*          pColIds = NULL;
2338 2339
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2340

H
Haojun Liao 已提交
2341
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2342
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2343
    tableListDestroy(pTableListInfo);
2344
    goto _error;
H
Haojun Liao 已提交
2345 2346
  }

2347
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2348
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2349

2350
  pInfo->pTagCond = pTagCond;
2351
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2352

2353
  int32_t numOfCols = 0;
2354 2355
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2356
  if (code != TSDB_CODE_SUCCESS) {
2357
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2358 2359
    goto _error;
  }
2360

H
Haojun Liao 已提交
2361
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2362
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2363
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2364
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2365 2366

    int16_t colId = id->colId;
2367
    taosArrayPush(pColIds, &colId);
2368
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2369
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2370
    }
H
Haojun Liao 已提交
2371 2372
  }

L
Liu Jicong 已提交
2373 2374 2375 2376
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2377
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2378 2379
      goto _error;
    }
2380

L
Liu Jicong 已提交
2381 2382 2383
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2384
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2385 2386 2387 2388
      goto _error;
    }
  }

2389 2390
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2391
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2392 2393
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2394
      tableListDestroy(pTableListInfo);
2395 2396 2397 2398
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2399
      tableListDestroy(pTableListInfo);
2400 2401 2402 2403
      goto _error;
    }
  }

L
Liu Jicong 已提交
2404
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2405
  if (pInfo->pBlockLists == NULL) {
2406
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2407
    tableListDestroy(pTableListInfo);
2408
    goto _error;
H
Haojun Liao 已提交
2409 2410
  }

5
54liuyao 已提交
2411
  if (pHandle->vnode) {
2412
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2413
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2414
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2415
      pTSInfo->base.cond.endVersion = pHandle->version;
2416
    }
L
Liu Jicong 已提交
2417

2418
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2419
    int32_t        num = 0;
2420
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2421

2422
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2423
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2424
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2425 2426
    }

L
Liu Jicong 已提交
2427 2428 2429 2430
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2431
    } else {
L
Liu Jicong 已提交
2432 2433
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2434 2435
    }

2436
    pInfo->pUpdateInfo = NULL;
2437
    pInfo->pTableScanOp = pTableScanOp;
2438 2439 2440
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2441

L
Liu Jicong 已提交
2442
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2443
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2444 2445
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2446

L
Liu Jicong 已提交
2447
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2448
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2449
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2450
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2451 2452 2453 2454
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2455

L
Liu Jicong 已提交
2456
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2457
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2458 2459
  } else {
    taosArrayDestroy(pColIds);
2460
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2461
    pColIds = NULL;
5
54liuyao 已提交
2462 2463
  }

2464 2465 2466 2467 2468
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2469 2470 2471 2472 2473
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2474
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2475
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2476
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2477
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2478
  pInfo->groupId = 0;
2479
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2480
  pInfo->pStreamScanOp = pOperator;
2481
  pInfo->deleteDataIndex = 0;
2482
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2483
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2484
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2485
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2486
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2487 2488
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2489
  pInfo->twAggSup.maxTs = INT64_MIN;
L
Liu Jicong 已提交
2490

L
Liu Jicong 已提交
2491 2492
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2493
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2494

2495
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2496 2497
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2498

H
Haojun Liao 已提交
2499
  return pOperator;
2500

L
Liu Jicong 已提交
2501
_error:
H
Haojun Liao 已提交
2502 2503 2504 2505 2506 2507 2508 2509
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2510 2511
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2512 2513
}

2514
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2515 2516 2517 2518
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2519 2520 2521
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2522
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2523
  SSDataBlock*  pRes = pInfo->pRes;
2524
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2525

2526
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2527
  if (size == 0) {
H
Haojun Liao 已提交
2528 2529 2530 2531
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2532 2533 2534
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2535
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2536

wmmhello's avatar
wmmhello 已提交
2537
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2538
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
L
Liu Jicong 已提交
2539
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2540
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2541
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2542 2543
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2544
      metaReaderClear(&mr);
2545
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2546
    }
H
Haojun Liao 已提交
2547

2548
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2549 2550 2551 2552 2553
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2554
        colDataSetVal(pDst, count, str, false);
2555
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2556 2557
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2558
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2559

2560 2561 2562 2563
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2564 2565
          data = (char*)p;
        }
2566
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2567
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2568

2569 2570
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2571
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2572
        }
H
Haojun Liao 已提交
2573 2574 2575
      }
    }

2576
    count += 1;
wmmhello's avatar
wmmhello 已提交
2577
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2578
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2579 2580 2581
    }
  }

2582 2583
  metaReaderClear(&mr);

2584
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2585
  if (pOperator->status == OP_EXEC_DONE) {
2586
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2587 2588 2589
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2590
  pOperator->resultInfo.totalRows += count;
2591

2592
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2593 2594
}

2595
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2596 2597
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2598
  taosArrayDestroy(pInfo->matchInfo.pList);
2599
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2600
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2601 2602
}

S
slzhou 已提交
2603
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2604
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2605
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2606 2607 2608 2609 2610
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2611 2612 2613 2614
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2615
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2616 2617 2618
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2619

H
Haojun Liao 已提交
2620 2621
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2622 2623 2624
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2625

2626
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2627
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2628 2629
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2630

L
Liu Jicong 已提交
2631 2632
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2633
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2634 2635
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2636 2637
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2638 2639

  return pOperator;
2640

2641
_error:
H
Haojun Liao 已提交
2642 2643 2644 2645 2646
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2647

dengyihao's avatar
dengyihao 已提交
2648
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2649 2650 2651 2652 2653 2654
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
2655
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2656

H
Haojun Liao 已提交
2657
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2658

L
Liu Jicong 已提交
2659
  int64_t      st = taosGetTimestampUs();
2660
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2661
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2662

D
dapan1121 已提交
2663
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2664
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
2665 2666 2667
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2668
  }
X
Xiaoyu Wang 已提交
2669

2670
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2671
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2672
  bool         hasNext = false;
2673
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2686

H
Haojun Liao 已提交
2687
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2688
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2689
      pInfo->base.dataReader = NULL;
2690
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2691 2692 2693
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2694
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2695 2696 2697 2698
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2699
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2700 2701 2702 2703
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2704 2705

    uint32_t status = 0;
2706
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2707
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2708
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2709
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2710 2711 2712 2713 2714 2715 2716
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2717
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2718

H
Haojun Liao 已提交
2719
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2720
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2721

2722
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2723 2724 2725 2726
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2727
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2728 2729
    return pBlock;
  }
H
Haojun Liao 已提交
2730

D
dapan1121 已提交
2731 2732 2733 2734
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2735
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2736 2737 2738
  return NULL;
}

2739 2740 2741
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2742
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2743
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2744
      tsTargetSlotId = colInfo->dstSlotId;
2745 2746 2747
    }
  }

2748 2749 2750
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2751
  bi.slotId = tsTargetSlotId;
2752 2753 2754 2755 2756 2757 2758
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2759
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2760 2761 2762 2763 2764 2765 2766
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2767

2768
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2769 2770 2771
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2772
  {
2773
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2774
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2775
    for (; i < numOfTables; ++i) {
2776
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2777 2778 2779 2780 2781 2782
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2783

S
slzhou 已提交
2784 2785
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2786

H
Haojun Liao 已提交
2787
  pInfo->base.dataReader = NULL;
2788

2789 2790
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2791
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2792
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2793 2794
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2795

dengyihao's avatar
dengyihao 已提交
2796
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2797 2798 2799 2800 2801 2802

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2803 2804 2805
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2806
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2807
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2808 2809
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2810
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2811 2812

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2813
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2814
    taosArrayPush(pInfo->queryConds, &cond);
2815 2816
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2817
  for (int32_t i = 0; i < numOfTable; ++i) {
2818
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2819
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2820
    ps->param = param;
2821
    ps->onlyRef = true;
2822 2823 2824 2825 2826 2827
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2828
    T_LONG_JMP(pTaskInfo->env, terrno);
2829 2830
  }

2831 2832 2833 2834 2835 2836 2837
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2838
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2839

2840 2841 2842 2843 2844 2845 2846
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2847
  for (int32_t i = 0; i < numOfTable; ++i) {
2848 2849
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2850 2851
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2852
  }
2853 2854
  taosArrayClear(pInfo->sortSourceParams);

2855
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2856
  pInfo->pSortHandle = NULL;
2857

dengyihao's avatar
opt mem  
dengyihao 已提交
2858 2859 2860
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2861
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2862 2863 2864
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2865
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2866 2867 2868
  return TSDB_CODE_SUCCESS;
}

2869 2870
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2871 2872
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2873 2874 2875
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2876
  blockDataCleanup(pResBlock);
2877 2878

  while (1) {
2879
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2880 2881 2882 2883
    if (pTupleHandle == NULL) {
      break;
    }

2884 2885
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2886 2887 2888 2889
      break;
    }
  }

2890
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2891
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2892
         pInfo->limitInfo.numOfOutputRows);
2893

2894
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2907
    T_LONG_JMP(pTaskInfo->env, code);
2908
  }
2909

2910
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2911 2912
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2913

S
slzhou 已提交
2914
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2915
      setOperatorCompleted(pOperator);
2916 2917
      return NULL;
    }
S
slzhou 已提交
2918
    pInfo->tableStartIndex = 0;
2919
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2920 2921
    startGroupTableMergeScan(pOperator);
  }
2922

S
slzhou 已提交
2923 2924
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2925 2926 2927 2928
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2929 2930
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2931
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2932
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2933 2934 2935
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2936
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2937 2938
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2939
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2940 2941
        break;
      }
2942

S
slzhou 已提交
2943
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2944
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2945
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2946
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2947
    }
wmmhello's avatar
wmmhello 已提交
2948 2949
  }

2950 2951 2952
  return pBlock;
}

2953
void destroyTableMergeScanOperatorInfo(void* param) {
2954
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2955
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2956

dengyihao's avatar
dengyihao 已提交
2957 2958 2959
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2960 2961
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2962 2963
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2964
  }
H
Haojun Liao 已提交
2965

D
dapan1121 已提交
2966 2967 2968
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;

2969
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2970 2971
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2972

dengyihao's avatar
opt mem  
dengyihao 已提交
2973 2974 2975
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2976 2977
  }

2978 2979
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
2980 2981 2982 2983 2984

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2985
  taosMemoryFreeClear(param);
2986 2987 2988 2989
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2990 2991
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2992
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2993
  execInfo->blockRecorder = pInfo->base.readRecorder;
2994
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2995 2996 2997

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2998

2999 3000 3001
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3002
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3003
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3004 3005 3006 3007 3008
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3009

3010 3011 3012
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3013
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3014
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3015 3016 3017
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3018

H
Haojun Liao 已提交
3019
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3020
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3021
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3022 3023 3024 3025
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3026
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3027 3028
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3029 3030 3031 3032
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3033 3034 3035 3036 3037 3038
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3039 3040
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3041
  pInfo->base.readHandle = *readHandle;
3042 3043 3044

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3045
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3046

3047
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3048
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3049 3050 3051 3052 3053 3054

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3055
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3056
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3057 3058
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3059
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3060

H
Haojun Liao 已提交
3061
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3062
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3063
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3064

dengyihao's avatar
dengyihao 已提交
3065
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3066 3067
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3068

L
Liu Jicong 已提交
3069 3070
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3071
  pOperator->exprSupp.numOfExprs = numOfCols;
3072

3073 3074
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3075 3076 3077 3078 3079 3080 3081 3082 3083
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3084 3085 3086 3087

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3088
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3089 3090 3091 3092 3093 3094
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3095 3096
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3097 3098 3099 3100 3101 3102 3103
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3165 3166
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3195

S
slzhou 已提交
3196
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3197 3198 3199
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3200
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3201
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3202
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3203 3204 3205 3206 3207 3208 3209 3210 3211

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3212
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3213 3214
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3215 3216 3217
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3218 3219 3220

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3221 3222
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3234 3235 3236
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3237
    ASSERT(strlen(dbName));
S
slzhou 已提交
3238
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3239 3240 3241 3242

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3243
    varDataSetLen(varDbName, strlen(dbName));
3244
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3245 3246 3247 3248
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3249
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3250
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3251
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3252
      varDataSetLen(varStbName, strlen(stbName));
3253
      colDataSetVal(colInfoData, 0, varStbName, false);
3254
    } else {
3255
      colDataSetNULL(colInfoData, 0);
3256
    }
S
slzhou 已提交
3257 3258 3259
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3260
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3261
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3262 3263 3264 3265
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3266
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3267 3268 3269
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3270
  size_t infodbTableNum;
S
slzhou 已提交
3271
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3272
  size_t perfdbTableNum;
S
slzhou 已提交
3273 3274
  getPerfDbMeta(NULL, &perfdbTableNum);

3275
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3276
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3277 3278
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3279
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3280 3281 3282 3283
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
3300 3301 3302 3303 3304 3305
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3306

S
slzhou 已提交
3307 3308 3309
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
3310 3311 3312 3313 3314 3315 3316
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3317 3318 3319 3320 3321 3322 3323 3324
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3325
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3326 3327 3328 3329
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3330
  blockDataCleanup(pRes);
3331

S
slzhou 已提交
3332 3333 3334
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3335
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3336
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3337
  }
S
slzhou 已提交
3338

S
slzhou 已提交
3339 3340 3341 3342 3343
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3344 3345
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3346
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3347

S
slzhou 已提交
3348 3349 3350 3351 3352 3353
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

3354
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3369
      }
S
slzhou 已提交
3370 3371 3372 3373 3374 3375 3376 3377 3378 3379
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3380
    } else {
S
slzhou 已提交
3381
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3382 3383
    }
  } else {
S
slzhou 已提交
3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3401 3402 3403
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3404
    }
S
slzhou 已提交
3405 3406 3407
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3408
  }
S
slzhou 已提交
3409 3410 3411 3412 3413 3414
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3415 3416 3417
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3418

S
slzhou 已提交
3419 3420 3421
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3422 3423 3424
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3425 3426 3427 3428 3429 3430 3431 3432
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3433 3434 3435 3436 3437
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
X
Xiaoyu Wang 已提交
3438

S
slzhou 已提交
3439 3440 3441 3442 3443 3444 3445 3446
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3447 3448 3449
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3450
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3451 3452
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3453
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3454 3455
  taosMemoryFreeClear(param);
}