scanoperator.c 130.6 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35 36
int32_t scanDebug = 0;


D
dapan1121 已提交
37
#define MULTI_READER_MAX_TABLE_NUM 5000
H
Haojun Liao 已提交
38
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
39
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
40

H
Haojun Liao 已提交
41 42 43 44 45 46 47 48 49
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
50
  SSDataBlock*   inputBlock;  
D
dapan1121 已提交
51
  bool           multiReader;
52
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
53 54
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
55
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
56

H
Haojun Liao 已提交
57
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
58 59 60 61 62 63 64 65 66 67 68 69
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

70
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
71 72 73 74 75
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

76 77 78 79 80 81 82 83 84
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
85
  // convert key to second
86 87 88 89 90 91 92
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
93
  time_t    t = (time_t)key;
94 95 96 97 98
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
99
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
100 101 102 103

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
104
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
105 106 107 108

  tw->ekey -= 1;
}

109
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
110 111 112 113 114 115 116
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

117
  if (order == TSDB_ORDER_ASC) {
118
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
119
    ASSERT(w.ekey >= pBlockInfo->window.skey);
120

121
    if (w.ekey < pBlockInfo->window.ekey) {
122 123 124
      return true;
    }

125 126
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
127 128 129 130
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

131
      ASSERT(w.ekey > pBlockInfo->window.ekey);
132
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
133 134 135 136
        return true;
      }
    }
  } else {
137
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
138
    ASSERT(w.skey <= pBlockInfo->window.ekey);
139

140
    if (w.skey > pBlockInfo->window.skey) {
141 142 143
      return true;
    }

144
    while (1) {
145 146 147 148 149 150
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
151
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
152 153 154
        return true;
      }
    }
155 156 157 158 159
  }

  return false;
}

160 161 162 163 164 165 166 167 168 169 170
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
171 172
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
173 174 175 176 177

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
178
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
179 180 181
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
182

183 184 185 186 187 188
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
189
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
190 191 192
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
193
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
194 195

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
196
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
197 198 199 200 201 202 203 204 205

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
206
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
207 208 209 210 211 212 213 214 215

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
216
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
217 218 219 220 221 222 223 224

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
225
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
226
                               int32_t numOfRows) {
H
Haojun Liao 已提交
227
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
228 229 230
    return true;
  }

H
Haojun Liao 已提交
231
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
232 233 234
  return keep;
}

H
Haojun Liao 已提交
235
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
236
  bool    allColumnsHaveAgg = true;
237
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
238
  if (code != TSDB_CODE_SUCCESS) {
239
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
240 241 242 243 244 245 246 247
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
248
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
249
                               int32_t rows) {
H
Haojun Liao 已提交
250 251 252
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

253
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
254
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
255
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
256
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
257 258
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
259 260 261

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
262 263 264
  }
}

265
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
266
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
267
  const char* id = GET_TASKID(pTaskInfo);
268

269
  if (pLimitInfo->remainOffset > 0) {
270 271
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
272
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
273
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
274
      return false;
275
    } else {
276
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
277 278 279 280 281 282
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
283
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
284
    blockDataKeepFirstNRows(pBlock, keep);
285 286

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
287
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
288
    return true;
289
  }
290

291
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
292
  return false;
293 294
}

H
Haojun Liao 已提交
295
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
296
                             uint32_t* status) {
S
slzhou 已提交
297
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
298
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
299 300

  pCost->totalBlocks += 1;
301
  pCost->totalRows += pBlock->info.rows;
302

H
Haojun Liao 已提交
303
  bool loadSMA = false;
H
Haojun Liao 已提交
304
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
305
  if (pOperator->exprSupp.pFilterInfo != NULL ||
306
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
307 308 309 310
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
311
  taosMemoryFreeClear(pBlock->pBlockAgg);
312 313

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
D
dapan1121 已提交
314
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
315
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
316
    pCost->filterOutBlocks += 1;
317
    pCost->totalRows += pBlock->info.rows;
318
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
319 320
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
D
dapan1121 已提交
321
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo),
H
Haojun Liao 已提交
322
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
323
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
324
    pCost->skipBlocks += 1;
325
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
326
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
327
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
328
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
329
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
330
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
331
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
D
dapan1121 已提交
332
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
333
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
334
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
335
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
336 337
      return TSDB_CODE_SUCCESS;
    } else {
338
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
339
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
340
    }
H
Haojun Liao 已提交
341
  }
342

H
Haojun Liao 已提交
343
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
344

H
Haojun Liao 已提交
345
  // try to filter data block according to sma info
H
Haojun Liao 已提交
346
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
347 348 349
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
350
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
351
      if (!keep) {
D
dapan1121 已提交
352
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
353 354 355 356
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

357
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
358 359
        return TSDB_CODE_SUCCESS;
      }
360
    }
H
Haojun Liao 已提交
361
  }
362

363 364 365
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

366
  // try to filter data block according to current results
367 368
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
D
dapan1121 已提交
369
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
370 371
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;
372
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
373
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
374 375 376
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
377 378
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
379

H
Haojun Liao 已提交
380 381
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
382
    return terrno;
H
Haojun Liao 已提交
383 384
  }

H
Haojun Liao 已提交
385
  ASSERT(p == pBlock);
386
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
387

H
Haojun Liao 已提交
388 389
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
390

H
Haojun Liao 已提交
391
  if (pOperator->exprSupp.pFilterInfo != NULL) {
392
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
393
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
394

395 396
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
397

398 399
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
400
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
401 402 403 404
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
405 406
  }

407
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
408
  if (limitReached) {  // set operator flag is done
409 410
    setOperatorCompleted(pOperator);
  }
411

H
Haojun Liao 已提交
412
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
413 414 415
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
416
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
417 418 419
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
420
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
421 422
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
423 424
}

425 426
typedef struct STableCachedVal {
  const char* pName;
427
  STag*       pTags;
428 429
} STableCachedVal;

430 431 432 433 434 435 436 437 438 439 440
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
441 442
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
443
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
444 445 446 447
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
448
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
449 450 451 452 453 454 455
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

456 457
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
458

459 460 461 462 463
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
464
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
465 466 467
  }
}

468 469
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
470
  // currently only the tbname pseudo column
471
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
472
    return TSDB_CODE_SUCCESS;
473 474
  }

475 476
  int32_t code = 0;

477 478 479 480
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

481
  bool            freeReader = false;
482
  STableCachedVal val = {0};
483 484

  SMetaReader mr = {0};
485
  LRUHandle*  h = NULL;
486

487 488 489
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

490
  // 1. check if it is existed in meta cache
491
  if (pCache == NULL) {
492
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
493
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
494
    if (code != TSDB_CODE_SUCCESS) {
495
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
496
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
497 498
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
499 500 501

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
502
      } else {
S
slzhou 已提交
503 504
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
505
      }
506 507 508 509 510
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
511

512 513
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
514 515

    freeReader = true;
516
  } else {
517 518
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
519
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
520 521
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
522
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
523
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
524
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
525
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
526
                pBlock->info.id.uid, tstrerror(terrno), idStr);
527 528
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
529
        } else {
H
Haojun Liao 已提交
530
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
531
                 idStr);
H
Haojun Liao 已提交
532
        }
533 534 535 536 537 538
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
539
      STableCachedVal* pVal = createTableCacheVal(&mr);
540

H
Haojun Liao 已提交
541
      val = *pVal;
542
      freeReader = true;
H
Haojun Liao 已提交
543

H
Haojun Liao 已提交
544
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
545
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
546 547 548 549 550 551 552 553
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
554

H
Haojun Liao 已提交
555
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
556
    }
H
Haojun Liao 已提交
557

558 559
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
560
  }
561

562 563
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
564
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
565 566

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
567
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
568

569
    int32_t functionId = pExpr1->pExpr->_function.functionId;
570 571 572

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
573
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
574
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
575
      STagVal tagVal = {0};
576 577
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
578

579 580 581 582
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
583
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
584
      }
585

H
Haojun Liao 已提交
586 587
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
588
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
589
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
590
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
591 592 593
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
594 595 596 597 598 599
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
600
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
601
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
602
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
603
        }
604 605 606 607
      }
    }
  }

608 609
  // restore the rows
  pBlock->info.rows = backupRows;
610 611 612 613
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
614
  return TSDB_CODE_SUCCESS;
615 616
}

H
Haojun Liao 已提交
617
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
618 619 620
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
621
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
622
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
623 624 625
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
626

H
Haojun Liao 已提交
627
  colInfoDataEnsureCapacity(&infoData, 1, false);
628
  colDataSetVal(&infoData, 0, buf, false);
629

H
Haojun Liao 已提交
630
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
631
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
632 633 634 635 636 637 638

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
639
  colDataDestroy(&infoData);
640 641
}

642
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
643
  STableScanInfo* pTableScanInfo = pOperator->info;
644
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
645
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
646 647
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
648

649 650
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
651 652 653 654 655 656 657 658 659 660 661
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
    
662
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
663
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
664
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
665
    }
H
Haojun Liao 已提交
666

667
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
668
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
669 670 671
      break;
    }

672 673 674 675 676 677
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
678
    if (pBlock->info.id.uid) {
679
      pBlock->info.id.groupId = 0;//getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
D
dapan1121 已提交
680 681
    }
    
682
    uint32_t status = 0;
H
Haojun Liao 已提交
683
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
684
    if (code != TSDB_CODE_SUCCESS) {
685
      T_LONG_JMP(pTaskInfo->env, code);
686
    }
687

688 689 690
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
691
    }
692

H
Haojun Liao 已提交
693 694
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
695

H
Haojun Liao 已提交
696
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
697 698

    // todo refactor
H
Haojun Liao 已提交
699
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
700 701
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
702
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
703
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
704

705
    return pBlock;
H
Haojun Liao 已提交
706 707 708 709
  }
  return NULL;
}

H
Haojun Liao 已提交
710
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
711 712 713 714
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
715
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
716 717 718
    return NULL;
  }

719 720
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
721 722 723
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
724 725
    }

726
    pTableScanInfo->scanTimes += 1;
727

728
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
729
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
730 731
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
732
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
733

734
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
735
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
736
    }
737
  }
H
Haojun Liao 已提交
738

739
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
740
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
741 742 743
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
744
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
745
    }
H
Haojun Liao 已提交
746

747
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
748 749 750
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
751
      }
H
Haojun Liao 已提交
752

753
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
754

755
      if (pTableScanInfo->scanTimes < total) {
756
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
757
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
758

759
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
760
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
761
      }
H
Haojun Liao 已提交
762 763 764
    }
  }

wmmhello's avatar
wmmhello 已提交
765 766 767 768 769 770 771
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

772
  // scan table one by one sequentially
L
Liu Jicong 已提交
773
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
774 775
    int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList);
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
776

L
Liu Jicong 已提交
777
    while (1) {
H
Haojun Liao 已提交
778
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
779
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
780 781
        return result;
      }
H
Haojun Liao 已提交
782

L
Liu Jicong 已提交
783 784
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
785 786

      taosRLockLatch(&pTaskInfo->lock);
787
      numOfTables = tableListGetSize(pInfo->base.pTableInfoList);
788

H
Haojun Liao 已提交
789
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
790
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
791
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
792 793
        return NULL;
      }
H
Haojun Liao 已提交
794

795
      tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableInfoList, pInfo->currentTable);
796 797 798 799
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
800
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
801

H
Haojun Liao 已提交
802
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
803 804
      pInfo->scanTimes = 0;
    }
805 806
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
807
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) {
H
Haojun Liao 已提交
808
        setOperatorCompleted(pOperator);
809 810
        return NULL;
      }
811

5
54liuyao 已提交
812
      int32_t        num = 0;
813
      STableKeyInfo* pList = NULL;
814
      tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
815
      ASSERT(pInfo->base.dataReader == NULL);
816

L
Liu Jicong 已提交
817
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
818
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
819 820 821
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
822 823 824 825

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
826
    }
H
Haojun Liao 已提交
827

H
Haojun Liao 已提交
828
    SSDataBlock* result = doGroupedTableScan(pOperator);
829 830 831
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
832

833
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableInfoList)) {
H
Haojun Liao 已提交
834
      setOperatorCompleted(pOperator);
835 836
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
837

838 839
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
840
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
841

5
54liuyao 已提交
842
    int32_t        num = 0;
843
    STableKeyInfo* pList = NULL;
844
    tableListGetGroupList(pInfo->base.pTableInfoList, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
845

H
Haojun Liao 已提交
846 847
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
848
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
849

H
Haojun Liao 已提交
850
    result = doGroupedTableScan(pOperator);
851 852 853
    if (result != NULL) {
      return result;
    }
854

H
Haojun Liao 已提交
855
    setOperatorCompleted(pOperator);
856 857
    return NULL;
  }
H
Haojun Liao 已提交
858 859
}

860 861
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
862
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
863
  *pRecorder = pTableScanInfo->base.readRecorder;
864 865 866 867 868
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

869 870
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
871

872 873
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
874

875 876
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
877
  }
L
Liu Jicong 已提交
878

879 880 881 882 883 884 885 886 887
  tableListDestroy(pBase->pTableInfoList);
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
888
  taosMemoryFreeClear(param);
889 890
}

891
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
892
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
893 894 895
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
896
    goto _error;
H
Haojun Liao 已提交
897 898
  }

899
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
900
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
901 902

  int32_t numOfCols = 0;
903
  int32_t code =
H
Haojun Liao 已提交
904
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
905 906 907 908
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
909
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
910
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
911
  if (code != TSDB_CODE_SUCCESS) {
912
    goto _error;
913 914
  }

H
Haojun Liao 已提交
915
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
916
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
917
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
918
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
919 920
  }

921
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
922
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
923

H
Haojun Liao 已提交
924 925
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
926 927
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

928 929
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
930

H
Haojun Liao 已提交
931
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
932
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
933
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
934

H
Haojun Liao 已提交
935 936 937
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
938 939
  }

wmmhello's avatar
wmmhello 已提交
940
  pInfo->currentGroupId = -1;
941
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
942
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
943

L
Liu Jicong 已提交
944 945
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
946
  pOperator->exprSupp.numOfExprs = numOfCols;
947

948
  pInfo->base.pTableInfoList = pTableListInfo;
H
Haojun Liao 已提交
949 950
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
951 952 953
    code = terrno;
    goto _error;
  }
954

D
dapan1121 已提交
955 956 957 958
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
959
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
960 961
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
962 963 964

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
965
  return pOperator;
966

967
_error:
968 969 970
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
971

972 973
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
974
  return NULL;
H
Haojun Liao 已提交
975 976
}

977
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
978
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
979
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
980

H
Haojun Liao 已提交
981
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
982
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
983

L
Liu Jicong 已提交
984 985
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
986
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
987 988 989
  return pOperator;
}

990
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
991
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
992 993
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
994 995
}

996
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
997
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
998 999
}

1000
static bool isStateWindow(SStreamScanInfo* pInfo) {
1001
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1002
}
5
54liuyao 已提交
1003

L
Liu Jicong 已提交
1004
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1005 1006 1007
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1008 1009 1010
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1011
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1012 1013
}

1014 1015 1016 1017
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1018
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1019 1020
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1021
  ASSERT(rowIndex < pBlock->info.rows);
1022
  pInfo->groupId = groupCol[rowIndex];
1023 1024
}

L
Liu Jicong 已提交
1025
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
1026
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
1027 1028
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1029
  tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1030
  qDebug("1");
H
Haojun Liao 已提交
1031
  pTableScanInfo->base.dataReader = NULL;
1032 1033
}

L
Liu Jicong 已提交
1034 1035
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1036
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1037

1038
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1039
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1040 1041 1042 1043 1044 1045 1046 1047 1048

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1049
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1050
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1051 1052
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1053
    T_LONG_JMP(pTaskInfo->env, code);
1054 1055 1056
    return NULL;
  }

D
dapan1121 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    T_LONG_JMP(pTaskInfo->env, code);
    return NULL;
  }

  if (hasNext) {
L
Liu Jicong 已提交
1066
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1067
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1068
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
1069 1070 1071
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1072
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1073 1074
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1075 1076

  return pBlock->info.rows > 0 ? pBlock : NULL;
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1088
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1089 1090
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
  return getTableGroupId(pTableScanInfo->base.pTableInfoList, uid);
1091 1092
}

5
54liuyao 已提交
1093 1094 1095 1096 1097 1098 1099 1100
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1101
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1102 1103 1104
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1115 1116 1117
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1118 1119 1120 1121 1122 1123

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1124
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1125 1126 1127 1128
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1129 1130 1131
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1132
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1133 1134 1135
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1136
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1137 1138 1139
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1140 1141
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1142 1143 1144 1145
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1146
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1147 1148 1149
  return true;
}

5
54liuyao 已提交
1150
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1151
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1152
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1153
  dumyInfo.cur.pageId = -1;
1154
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1155 1156
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1157
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1158

5
54liuyao 已提交
1159
  while (1) {
1160 1161 1162
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1163
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1164 1165 1166 1167 1168
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1169
    }
5
54liuyao 已提交
1170

5
54liuyao 已提交
1171 1172 1173
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1174
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1175
    endWin = preWin;
5
54liuyao 已提交
1176
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1177 1178 1179 1180 1181 1182
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1183

L
Liu Jicong 已提交
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1195
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1196
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1197
      tsdbReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1198
      qDebug("2");
H
Haojun Liao 已提交
1199
      pTableScanInfo->base.dataReader = NULL;
1200 1201
      return NULL;
    }
L
Liu Jicong 已提交
1202

H
Haojun Liao 已提交
1203
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1204 1205 1206 1207
    if (pResult->info.rows == 0) {
      continue;
    }

1208 1209 1210 1211 1212 1213 1214 1215
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1216 1217
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1218
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1219 1220 1221 1222
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1223 1224 1225

      blockDataDestroy(tmpBlock);

1226 1227 1228 1229
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1230
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1231
      pResult->info.calWin = pInfo->updateWin;
1232
      return pResult;
5
54liuyao 已提交
1233 1234
    }
  }
1235
}
1236

1237
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1238
                                   SSessionKey* pKey) {
1239 1240 1241
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1242

1243 1244 1245 1246 1247
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
H
Haojun Liao 已提交
1248 1249

  taosMemoryFree(pCur);
1250 1251 1252
  return code;
}

1253
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1254
  blockDataCleanup(pDestBlock);
1255 1256
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1257
  }
1258
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1259
  if (code != TSDB_CODE_SUCCESS) {
1260
    return code;
L
Liu Jicong 已提交
1261
  }
1262 1263
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1264
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1265
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1266
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1267 1268
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1269

1270 1271
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1272
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1273
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1274 1275
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1276
  int64_t          version = pSrcBlock->info.version - 1;
1277
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1278
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1279
    // gap must be 0.
5
54liuyao 已提交
1280
    SSessionKey startWin = {0};
1281
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1282
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1283 1284 1285
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1286 1287
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1288
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1289 1290 1291 1292
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1293
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1294 1295
      continue;
    }
1296 1297
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1298

1299
    colDataSetNULL(pDestUidCol, i);
1300
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1301 1302
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1303
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1304
  }
1305
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1306
}
1307 1308 1309 1310 1311 1312

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1313
  }
1314

1315 1316
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1317 1318
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1319

L
Liu Jicong 已提交
1320
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1321
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1358 1359
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1360
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1361 1362 1363
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1364
  for (int32_t i = 0; i < rows;) {
1365
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1366 1367 1368 1369 1370
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1371
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1372
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1373 1374
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1375 1376 1377 1378 1379
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1380
    pDestBlock->info.rows++;
5
54liuyao 已提交
1381
  }
1382 1383
  return TSDB_CODE_SUCCESS;
}
1384

1385
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1386 1387 1388
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1389 1390
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1391
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1392 1393 1394 1395
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1406
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1407 1408
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1409
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1410 1411 1412
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1413
    if (pInfo->tbnameCalSup.pExprInfo) {
1414 1415 1416
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1417 1418
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1419
      tdbFree(parTbname);
L
Liu Jicong 已提交
1420 1421 1422
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1423 1424 1425 1426
  }
  return TSDB_CODE_SUCCESS;
}

1427 1428 1429 1430
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1431
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1432
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1433 1434
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1435
  }
1436
  pDestBlock->info.type = STREAM_CLEAR;
1437
  pDestBlock->info.version = pSrcBlock->info.version;
1438
  pDestBlock->info.dataLoad = 1;
1439 1440 1441 1442
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1443 1444 1445
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SExprSupp*    pTagCalSup = &pInfo->tagCalSup;
1446
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1447
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
L
Liu Jicong 已提交
1448
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1449

L
Liu Jicong 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465
  void*   tag = NULL;
  int32_t tagLen = 0;
  if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
    pBlock->info.tagLen = tagLen;
    void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
    if (pTag == NULL) {
      tdbFree(tag);
      taosMemoryFree(pBlock->info.pTag);
      pBlock->info.pTag = NULL;
      pBlock->info.tagLen = 0;
      return;
    }
    pBlock->info.pTag = pTag;
    memcpy(pBlock->info.pTag, tag, tagLen);
    tdbFree(tag);
    return;
L
Liu Jicong 已提交
1466
  } else {
L
Liu Jicong 已提交
1467
    pBlock->info.pTag = NULL;
L
Liu Jicong 已提交
1468
  }
L
Liu Jicong 已提交
1469 1470 1471
  tdbFree(tag);
}
#endif
L
Liu Jicong 已提交
1472

5
54liuyao 已提交
1473
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1474 1475
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
5
54liuyao 已提交
1476 1477
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1478
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1479
  } else {
5
54liuyao 已提交
1480 1481
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1482
  }
L
Liu Jicong 已提交
1483 1484
}

1485 1486
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1487 1488
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1489 1490
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1491 1492
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1493
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1494 1495 1496 1497 1498 1499 1500
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1501
  pBlock->info.rows++;
5
54liuyao 已提交
1502 1503
}

1504
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1505 1506
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1507
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1508
  }
1509 1510
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1511
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1512
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1513
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1514 1515
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1516
    bool        isClosed = false;
5
54liuyao 已提交
1517
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1518
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1519 1520 1521 1522 1523
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1524 1525 1526
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1527
    // must check update info first.
H
Haojun Liao 已提交
1528
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1529
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1530
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1531
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1532
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1533
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1534
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1535
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1536
                                       NULL);
5
54liuyao 已提交
1537 1538
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1539 1540
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1541
      }
1542 1543
    }
  }
1544 1545
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1546
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1547
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1548
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1549 1550
  }
}
L
Liu Jicong 已提交
1551

1552
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1553 1554
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1555
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1556

1557 1558
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1559
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1560
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1561
  pInfo->pRes->info.type = STREAM_NORMAL;
1562
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1563

1564 1565
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableInfoList, pBlock->info.id.uid);
L
Liu Jicong 已提交
1566 1567

  // todo extract method
H
Haojun Liao 已提交
1568 1569 1570
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1571 1572 1573 1574 1575 1576 1577
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1578
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1579
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1580 1581 1582 1583 1584 1585 1586
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1587
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1588
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1589 1590 1591 1592 1593
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1594
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1595
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1596 1597
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1598
      blockDataFreeRes((SSDataBlock*)pBlock);
1599
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1600
    }
K
kailixu 已提交
1601 1602 1603

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1604 1605
  }

1606
  if (filter) {
H
Haojun Liao 已提交
1607
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1608
  }
1609

1610
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1611
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1612
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1613

L
Liu Jicong 已提交
1614
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1615 1616
  return 0;
}
5
54liuyao 已提交
1617

L
Liu Jicong 已提交
1618
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1619 1620
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
1621
  const char* id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1622

1623
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1624

L
Liu Jicong 已提交
1625
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
L
Liu Jicong 已提交
1626

1627
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1628
      SPackedData submit = pTaskInfo->streamInfo.submit;
L
Liu Jicong 已提交
1629
      if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
1630
        qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id);
L
Liu Jicong 已提交
1631
        pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1632
        pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1633 1634 1635 1636 1637 1638 1639
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

L
Liu Jicong 已提交
1640
    while (tqNextDataBlock2(pInfo->tqReader)) {
L
Liu Jicong 已提交
1641 1642
      SSDataBlock block = {0};

1643
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1644 1645 1646 1647
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1648
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1649 1650 1651 1652 1653 1654

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1655
    pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1656
    pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1657
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1658
    return NULL;
L
Liu Jicong 已提交
1659 1660
  }

L
Liu Jicong 已提交
1661 1662 1663
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
D
dapan1121 已提交
1664
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
1665
             pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id);
1666
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1667 1668
      return pResult;
    } else {
1669
      // no data has return already, try to extract data in the WAL
1670 1671
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1672
        tsdbReaderClose(pTSInfo->base.dataReader);
1673

H
Haojun Liao 已提交
1674
        pTSInfo->base.dataReader = NULL;
1675
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1676 1677

        qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id);
H
Haojun Liao 已提交
1678
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1679
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1680 1681 1682
          return NULL;
        }
      } else {
L
Liu Jicong 已提交
1683 1684
        return NULL;
      }
1685 1686 1687
    }
  }

L
Liu Jicong 已提交
1688 1689 1690
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
H
Haojun Liao 已提交
1691 1692
      terrno = 0;

1693
      if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
1694 1695
        // if the end is reached, terrno is 0
        if (terrno != 0) {
1696
          qError("failed to get next log block since %s, %s", terrstr(), id);
1697
        }
1698
      }
1699

L
Liu Jicong 已提交
1700 1701
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1702
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1703
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1704
          pOperator->status = OP_EXEC_RECV;
D
dapan1121 已提交
1705
          qDebug("queue scan log return %" PRId64 " rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1706 1707 1708
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
1709
        qError("unexpected ret.fetchType:%d", ret.fetchType);
1710
        continue;
L
Liu Jicong 已提交
1711 1712
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1713
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1714 1715
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1716
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1717
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1718 1719 1720
        return NULL;
      }
    }
L
Liu Jicong 已提交
1721
  } else {
1722
    qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id);
L
Liu Jicong 已提交
1723
    return NULL;
H
Haojun Liao 已提交
1724
  }
L
Liu Jicong 已提交
1725 1726
}

L
Liu Jicong 已提交
1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1745 1746 1747
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1748

1749 1750 1751
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1752 1753 1754
      j++;
    }
  }
L
Liu Jicong 已提交
1755
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1756 1757
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1758
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1759 1760 1761 1762

  return 0;
}

5
54liuyao 已提交
1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1775
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1776 1777 1778 1779
    }
  }
}

5
54liuyao 已提交
1780
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1781
  if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1782
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1794 1795 1796 1797
    }
  }
}

L
Liu Jicong 已提交
1798 1799 1800 1801 1802
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1803
  qDebug("stream scan called");
H
Haojun Liao 已提交
1804

1805 1806
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1807
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1808
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1809
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1810 1811 1812 1813
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1814
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1815
    } else {
H
Haojun Liao 已提交
1816 1817 1818 1819
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1820
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1821
    }
L
Liu Jicong 已提交
1822 1823

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1824
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1825 1826
    qDebug("4");

H
Haojun Liao 已提交
1827
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1828
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1829

L
Liu Jicong 已提交
1830 1831
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1832
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1833 1834
  }

5
54liuyao 已提交
1835 1836
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1837 1838 1839 1840 1841
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1842 1843 1844 1845 1846 1847 1848

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1849 1850 1851 1852
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1853
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1854 1855
        return pInfo->pUpdateRes;
      } break;
1856 1857 1858 1859 1860 1861 1862 1863 1864
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1865 1866 1867 1868 1869 1870 1871 1872
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1873
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1874 1875 1876 1877 1878 1879
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1880 1881 1882 1883 1884 1885
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1886
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1887
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1888
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1889 1890 1891 1892 1893 1894
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1895
      }
5
54liuyao 已提交
1896 1897
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1898
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1899 1900
        return pInfo->pCreateTbRes;
      }
D
dapan1121 已提交
1901
      qDebug("stream recover scan get block, rows %" PRId64 , pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1902 1903
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1904 1905
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1906
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1907
    tsdbReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1908 1909
    qDebug("5");

H
Haojun Liao 已提交
1910
    pTSInfo->base.dataReader = NULL;
1911

H
Haojun Liao 已提交
1912 1913
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1914

L
Liu Jicong 已提交
1915
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1916 1917 1918
    return NULL;
  }

5
54liuyao 已提交
1919
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1920
// TODO: refactor
L
Liu Jicong 已提交
1921
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1922
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1923
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1924
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1925
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1926 1927 1928
      return NULL;
    }

1929
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1930 1931
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1932
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1933
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1934
    }
1935
    // TODO move into scan
5
54liuyao 已提交
1936 1937
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1938
    pBlock->info.dataLoad = 1;
1939
    blockDataUpdateTsWindow(pBlock, 0);
1940
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1941 1942 1943
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1944 1945 1946
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1947 1948
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1949 1950 1951
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1952
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1953
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1954
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1955
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1956
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1957 1958
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1959
        }
5
54liuyao 已提交
1960 1961
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1962 1963 1964 1965 1966 1967
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1968
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1969
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1970
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1971
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1972 1973
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1974 1975 1976 1977 1978
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1979 1980 1981
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1982
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1983 1984 1985
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1986 1987 1988 1989
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1990
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1991
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1992 1993 1994 1995
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1996
        }
1997 1998 1999
      } break;
      default:
        break;
5
54liuyao 已提交
2000
    }
2001
    // printDataBlock(pBlock, "stream scan recv");
2002
    return pBlock;
L
Liu Jicong 已提交
2003
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
2004
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
2005 2006 2007
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
2008
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
2009 2010 2011
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2012 2013 2014
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2015
      } break;
2016
      case STREAM_SCAN_FROM_DELETE_DATA: {
2017 2018 2019 2020 2021 2022 2023
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2034
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
2035 2036
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
2037 2038
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
2039
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2040
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2041 2042
          return pSDB;
        }
2043
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2044 2045 2046 2047
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2048
    }
2049

2050
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2051
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2052 2053
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2054 2055
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2056
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2057
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2058
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2059
    }
5
54liuyao 已提交
2060

H
Haojun Liao 已提交
2061 2062
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2063
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2064

L
Liu Jicong 已提交
2065
  NEXT_SUBMIT_BLK:
2066
    while (1) {
L
Liu Jicong 已提交
2067
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2068
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2069
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2070
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2071
          qDebug("stream scan return empty, consume block %d", totBlockNum);
2072 2073
          return NULL;
        }
2074

L
Liu Jicong 已提交
2075 2076
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
L
Liu Jicong 已提交
2077
        /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
L
Liu Jicong 已提交
2078
        if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2079 2080 2081 2082
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2083 2084
      }

2085 2086
      blockDataCleanup(pInfo->pRes);

L
Liu Jicong 已提交
2087
      while (tqNextDataBlock2(pInfo->tqReader)) {
2088
        SSDataBlock block = {0};
2089

2090
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2091 2092 2093 2094 2095

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2096
        setBlockIntoRes(pInfo, &block, false);
2097

H
Haojun Liao 已提交
2098
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
2099
                             pInfo->pRes->info.version)) {
2100 2101 2102 2103 2104
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

5
54liuyao 已提交
2105 2106 2107
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2108 2109
        }

5
54liuyao 已提交
2110
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2111
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2112
        pInfo->pRes->info.dataLoad = 1;
2113 2114 2115
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2116 2117 2118
          break;
        }
      }
2119
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2120
        break;
J
jiacy-jcy 已提交
2121
      } else {
2122
        continue;
5
54liuyao 已提交
2123
      }
H
Haojun Liao 已提交
2124 2125 2126 2127
    }

    // record the scan action.
    pInfo->numOfExec++;
2128
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2129
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2130

D
dapan1121 已提交
2131
    qDebug("scan rows: %" PRId64 , pBlockInfo->rows);
L
Liu Jicong 已提交
2132 2133 2134
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2135 2136 2137 2138 2139 2140

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2141 2142 2143
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2144 2145 2146
  }
}

H
Haojun Liao 已提交
2147
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2148 2149 2150
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2151 2152 2153
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2154 2155 2156 2157 2158 2159
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2160
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2161 2162
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2163
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2164
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2165
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2166
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2167

wmmhello's avatar
wmmhello 已提交
2168
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2169
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
        longjmp(pTaskInfo->env, code);
      }
    }
    
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2180
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2181
        tsdbReleaseDataBlock(pInfo->dataReader);
2182
        longjmp(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2183
      }
2184

H
Haojun Liao 已提交
2185 2186
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
wmmhello's avatar
wmmhello 已提交
2187
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2188 2189
      }

H
Haojun Liao 已提交
2190
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2191
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2192
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2193 2194 2195
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2196 2197

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2198
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2199 2200
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2201 2202
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2203
    } else {
wmmhello's avatar
wmmhello 已提交
2204 2205
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2206
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2207 2208
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2209
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2210
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2211
    return NULL;
L
Liu Jicong 已提交
2212 2213 2214 2215 2216 2217 2218
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2219
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2220
      taosMemoryFreeClear(data);
2221 2222 2223
      return NULL;
    }

L
Liu Jicong 已提交
2224
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2225 2226 2227 2228
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2229
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2230
    } else {
wmmhello's avatar
wmmhello 已提交
2231 2232 2233 2234 2235 2236 2237
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2238

wmmhello's avatar
wmmhello 已提交
2239
    return NULL;
2240
  }
L
Liu Jicong 已提交
2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2279 2280 2281
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2282
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2283 2284 2285 2286 2287 2288
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2289 2290 2291
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2292
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2293 2294 2295 2296 2297
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2298 2299
  int32_t code = TSDB_CODE_SUCCESS;

2300
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2301
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2302
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2303 2304
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2305 2306
  }

wmmhello's avatar
wmmhello 已提交
2307 2308
  pInfo->vnode = pHandle->vnode;

2309
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2310 2311
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2312

2313
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2314
  return pOperator;
H
Haojun Liao 已提交
2315

L
Liu Jicong 已提交
2316
_end:
H
Haojun Liao 已提交
2317 2318 2319 2320
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2321 2322
}

2323
static void destroyStreamScanOperatorInfo(void* param) {
2324
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2325

2326
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2327
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2328 2329 2330 2331
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2332 2333
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2334
  }
C
Cary Xu 已提交
2335 2336
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2337
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2338
  }
C
Cary Xu 已提交
2339

L
Liu Jicong 已提交
2340
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2341
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2342

L
Liu Jicong 已提交
2343
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2344 2345 2346 2347
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2348
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2349
  blockDataDestroy(pStreamScan->pCreateTbRes);
2350 2351 2352 2353
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2354
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2355
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2356
  SArray*          pColIds = NULL;
2357 2358
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2359

H
Haojun Liao 已提交
2360
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2361
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2362
    goto _error;
H
Haojun Liao 已提交
2363 2364
  }

2365
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2366
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2367

2368
  pInfo->pTagCond = pTagCond;
2369
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2370

2371
  int32_t numOfCols = 0;
2372 2373
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2374 2375 2376
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2377

H
Haojun Liao 已提交
2378
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2379
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2380
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2381
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2382 2383

    int16_t colId = id->colId;
2384
    taosArrayPush(pColIds, &colId);
2385
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2386
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2387
    }
H
Haojun Liao 已提交
2388 2389
  }

L
Liu Jicong 已提交
2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2403 2404
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2405
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2406 2407 2408 2409 2410 2411 2412 2413 2414 2415
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

L
Liu Jicong 已提交
2416
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2417
  if (pInfo->pBlockLists == NULL) {
2418 2419
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2420 2421
  }

5
54liuyao 已提交
2422
  if (pHandle->vnode) {
2423
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2424
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2425
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2426
      pTSInfo->base.cond.endVersion = pHandle->version;
2427
    }
L
Liu Jicong 已提交
2428

2429
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2430
    int32_t        num = 0;
2431
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2432

2433
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2434
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2435
      pTSInfo->base.dataReader = NULL;
2436
      pTaskInfo->streamInfo.lastStatus.uid = -1;
L
Liu Jicong 已提交
2437 2438
    }

L
Liu Jicong 已提交
2439 2440 2441 2442
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2443
    } else {
L
Liu Jicong 已提交
2444 2445
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2446 2447
    }

2448
    pInfo->pUpdateInfo = NULL;
2449
    pInfo->pTableScanOp = pTableScanOp;
2450 2451 2452
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2453

L
Liu Jicong 已提交
2454 2455
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2456
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2457 2458
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2459

L
Liu Jicong 已提交
2460
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2461
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2462
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableInfoList);
2463
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2464 2465 2466 2467 2468
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2469
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2470 2471
  } else {
    taosArrayDestroy(pColIds);
H
Haojun Liao 已提交
2472
    pColIds = NULL;
5
54liuyao 已提交
2473 2474
  }

2475 2476 2477 2478 2479
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2480 2481 2482 2483 2484
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2485
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2486
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2487
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2488
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2489
  pInfo->groupId = 0;
2490
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2491
  pInfo->pStreamScanOp = pOperator;
2492
  pInfo->deleteDataIndex = 0;
2493
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2494
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2495
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2496
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2497
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2498 2499
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2500
  pInfo->twAggSup.maxTs = INT64_MIN;
L
Liu Jicong 已提交
2501

L
Liu Jicong 已提交
2502 2503
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2504
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2505

L
Liu Jicong 已提交
2506
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2507 2508
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2509

H
Haojun Liao 已提交
2510
  return pOperator;
2511

L
Liu Jicong 已提交
2512
_error:
H
Haojun Liao 已提交
2513 2514 2515 2516 2517 2518 2519 2520
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2521 2522
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2523 2524
}

2525
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2526 2527 2528 2529
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2530 2531 2532
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2533
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2534
  SSDataBlock*  pRes = pInfo->pRes;
2535
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2536

2537
  int32_t size = tableListGetSize(pInfo->pTableInfoList);
wmmhello's avatar
wmmhello 已提交
2538
  if (size == 0) {
H
Haojun Liao 已提交
2539 2540 2541 2542
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2543 2544 2545
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2546
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2547

wmmhello's avatar
wmmhello 已提交
2548
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2549
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableInfoList, pInfo->curPos);
L
Liu Jicong 已提交
2550
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2551
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2552
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2553 2554
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2555
      metaReaderClear(&mr);
2556
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2557
    }
H
Haojun Liao 已提交
2558

2559
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2560 2561 2562 2563 2564
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2565
        colDataSetVal(pDst, count, str, false);
2566
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2567 2568
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2569
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2570

2571 2572 2573 2574
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2575 2576
          data = (char*)p;
        }
2577
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2578
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2579

2580 2581
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2582
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2583
        }
H
Haojun Liao 已提交
2584 2585 2586
      }
    }

2587
    count += 1;
wmmhello's avatar
wmmhello 已提交
2588
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2589
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2590 2591 2592
    }
  }

2593 2594
  metaReaderClear(&mr);

2595
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2596
  if (pOperator->status == OP_EXEC_DONE) {
2597
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2598 2599 2600
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2601
  pOperator->resultInfo.totalRows += count;
2602

2603
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2604 2605
}

2606
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2607 2608
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2609
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2610
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2611 2612
}

S
slzhou 已提交
2613 2614
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         SExecTaskInfo* pTaskInfo) {
2615
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2616 2617 2618 2619 2620
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2621 2622 2623 2624
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2625
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2626 2627 2628
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2629

H
Haojun Liao 已提交
2630 2631
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2632 2633 2634
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2635

H
Haojun Liao 已提交
2636
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2637 2638
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2639

L
Liu Jicong 已提交
2640 2641
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2642
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2643 2644
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2645 2646
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2647 2648

  return pOperator;
2649

2650
_error:
H
Haojun Liao 已提交
2651 2652 2653 2654 2655
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2656

dengyihao's avatar
dengyihao 已提交
2657
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2658 2659 2660 2661 2662 2663
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
2664
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2665

H
Haojun Liao 已提交
2666
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2667

L
Liu Jicong 已提交
2668
  int64_t      st = taosGetTimestampUs();
2669
  void*        p = tableListGetInfo(pInfo->base.pTableInfoList, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2670
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2671

D
dapan1121 已提交
2672
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2673
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
2674 2675 2676
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2677
  }
2678 2679
  
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2680
  STsdbReader* reader = pInfo->base.dataReader;
D
dapan1121 已提交
2681
  bool hasNext = false;
2682
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
  
H
Haojun Liao 已提交
2696
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2697
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2698
      pInfo->base.dataReader = NULL;
2699
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2700 2701 2702
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2703
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2704 2705 2706 2707
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2708
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2709 2710 2711 2712
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2713 2714

    uint32_t status = 0;
2715
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2716
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2717
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2718
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2719 2720 2721 2722 2723 2724 2725
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2726
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableInfoList, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2727

H
Haojun Liao 已提交
2728
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2729
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2730

2731
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2732 2733 2734 2735
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2736
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2737 2738
    return pBlock;
  }
H
Haojun Liao 已提交
2739

D
dapan1121 已提交
2740 2741 2742 2743
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2744
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2745 2746 2747
  return NULL;
}

2748 2749 2750
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2751
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2752
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2753
      tsTargetSlotId = colInfo->dstSlotId;
2754 2755 2756
    }
  }

2757 2758 2759
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2760
  bi.slotId = tsTargetSlotId;
2761 2762 2763 2764 2765 2766 2767
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2768
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2769 2770 2771 2772 2773 2774 2775
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2776

2777
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2778 2779 2780
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2781
  {
2782
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableInfoList);
S
slzhou 已提交
2783
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2784
    for (; i < numOfTables; ++i) {
2785
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableInfoList, i);
S
slzhou 已提交
2786 2787 2788 2789 2790 2791
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2792

S
slzhou 已提交
2793 2794
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2795

H
Haojun Liao 已提交
2796
  pInfo->base.dataReader = NULL;
2797

2798 2799
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2800
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2801
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2802 2803
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2804

dengyihao's avatar
dengyihao 已提交
2805
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2806 2807 2808 2809 2810 2811

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2812 2813 2814
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2815
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2816
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2817 2818
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2819
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2820 2821

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2822
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2823
    taosArrayPush(pInfo->queryConds, &cond);
2824 2825
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2826
  for (int32_t i = 0; i < numOfTable; ++i) {
2827
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2828
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2829
    ps->param = param;
2830
    ps->onlyRef = true;
2831 2832 2833 2834 2835 2836
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2837
    T_LONG_JMP(pTaskInfo->env, terrno);
2838 2839
  }

2840 2841 2842 2843 2844 2845 2846
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2847
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2848

2849 2850 2851 2852 2853 2854 2855
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2856
  for (int32_t i = 0; i < numOfTable; ++i) {
2857 2858
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2859 2860
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2861
  }
2862 2863
  taosArrayClear(pInfo->sortSourceParams);

2864
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2865
  pInfo->pSortHandle = NULL;
2866

dengyihao's avatar
opt mem  
dengyihao 已提交
2867 2868 2869
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2870
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2871 2872 2873
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2874
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2875 2876 2877
  return TSDB_CODE_SUCCESS;
}

2878 2879
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2880 2881
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2882 2883 2884
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2885
  blockDataCleanup(pResBlock);
2886 2887

  while (1) {
2888
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2889 2890 2891 2892
    if (pTupleHandle == NULL) {
      break;
    }

2893 2894
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2895 2896 2897 2898
      break;
    }
  }

2899
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2900
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2901
         pInfo->limitInfo.numOfOutputRows);
2902

2903
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2916
    T_LONG_JMP(pTaskInfo->env, code);
2917
  }
2918

2919
  size_t tableListSize = tableListGetSize(pInfo->base.pTableInfoList);
S
slzhou 已提交
2920 2921
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2922

S
slzhou 已提交
2923
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2924
      setOperatorCompleted(pOperator);
2925 2926
      return NULL;
    }
S
slzhou 已提交
2927
    pInfo->tableStartIndex = 0;
2928
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex))->groupId;
2929 2930
    startGroupTableMergeScan(pOperator);
  }
2931

S
slzhou 已提交
2932 2933
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2934 2935 2936 2937
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2938 2939
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2940
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2941
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2942 2943 2944
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2945
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2946 2947
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2948
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2949 2950
        break;
      }
2951

S
slzhou 已提交
2952
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2953
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableInfoList, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2954
      startGroupTableMergeScan(pOperator);
D
dapan1121 已提交
2955
      resetLimitInfoForNextGroup(&pInfo->limitInfo);      
S
slzhou 已提交
2956
    }
wmmhello's avatar
wmmhello 已提交
2957 2958
  }

2959 2960 2961
  return pBlock;
}

2962
void destroyTableMergeScanOperatorInfo(void* param) {
2963
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2964
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2965

dengyihao's avatar
dengyihao 已提交
2966 2967 2968
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2969 2970
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2971 2972
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2973
  }
H
Haojun Liao 已提交
2974

2975
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2976 2977
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2978

dengyihao's avatar
opt mem  
dengyihao 已提交
2979 2980 2981
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2982 2983
  }

2984 2985
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
2986 2987 2988 2989 2990

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2991
  taosMemoryFreeClear(param);
2992 2993 2994 2995
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2996 2997
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2998
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2999
  execInfo->blockRecorder = pInfo->base.readRecorder;
3000
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3001 3002 3003

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3004

3005 3006 3007
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3008
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3009
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3010 3011 3012 3013 3014
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3015

3016 3017 3018
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3019
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3020
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3021 3022 3023
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3024

H
Haojun Liao 已提交
3025
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3026
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3027
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3028 3029 3030 3031
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3032
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3033 3034
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3035 3036 3037 3038
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3039 3040 3041 3042 3043 3044
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3045 3046
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3047
  pInfo->base.readHandle = *readHandle;
3048 3049 3050

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3051
  pInfo->base.pTableInfoList = pTableListInfo;
H
Haojun Liao 已提交
3052

3053
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3054
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3055 3056 3057 3058 3059 3060

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3061
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3062
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3063 3064
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3065
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3066

H
Haojun Liao 已提交
3067
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3068
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3069
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3070

dengyihao's avatar
dengyihao 已提交
3071
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3072 3073
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3074

L
Liu Jicong 已提交
3075 3076
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3077
  pOperator->exprSupp.numOfExprs = numOfCols;
3078

3079 3080
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3081 3082 3083 3084 3085 3086 3087 3088 3089
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3090 3091 3092 3093

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3094
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3095 3096 3097 3098 3099 3100
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3101 3102
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3103 3104 3105 3106 3107 3108 3109
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3171 3172
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3201

S
slzhou 已提交
3202
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3203 3204 3205
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3206
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3207
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3208
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3209 3210 3211 3212 3213 3214 3215 3216 3217

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3218
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3219 3220
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3221 3222 3223
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3224 3225 3226

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3227 3228
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3240 3241 3242
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3243
    ASSERT(strlen(dbName));
S
slzhou 已提交
3244
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3245 3246 3247 3248

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3249
    varDataSetLen(varDbName, strlen(dbName));
3250
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3251 3252 3253 3254
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3255
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3256
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3257
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3258
      varDataSetLen(varStbName, strlen(stbName));
3259
      colDataSetVal(colInfoData, 0, varStbName, false);
3260
    } else {
3261
      colDataSetNULL(colInfoData, 0);
3262
    }
S
slzhou 已提交
3263 3264 3265
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3266
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3267
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3268 3269 3270 3271
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3272
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3273 3274 3275
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3276
  size_t infodbTableNum;
S
slzhou 已提交
3277
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3278
  size_t perfdbTableNum;
S
slzhou 已提交
3279 3280
  getPerfDbMeta(NULL, &perfdbTableNum);

3281
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3282
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3283 3284
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3285
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3286 3287 3288 3289
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
3306 3307 3308 3309 3310 3311 3312
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
    
S
slzhou 已提交
3313 3314 3315
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
3316 3317 3318 3319 3320 3321 3322
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3323 3324 3325 3326 3327 3328 3329 3330
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3331
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3332 3333 3334 3335
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3336
  blockDataCleanup(pRes);
3337

S
slzhou 已提交
3338 3339 3340
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3341
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3342
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3343
  }
S
slzhou 已提交
3344

S
slzhou 已提交
3345 3346 3347 3348 3349
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3350 3351
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3352
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3353

S
slzhou 已提交
3354 3355 3356 3357 3358 3359
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

3360
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3375
      }
S
slzhou 已提交
3376 3377 3378 3379 3380 3381 3382 3383 3384 3385
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3386
    } else {
S
slzhou 已提交
3387
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3388 3389
    }
  } else {
S
slzhou 已提交
3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3407 3408 3409
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3410
    }
S
slzhou 已提交
3411 3412 3413
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3414
  }
S
slzhou 已提交
3415 3416 3417 3418 3419 3420
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3421 3422 3423 3424
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
  
S
slzhou 已提交
3425 3426 3427
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3428 3429 3430
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3431 3432 3433 3434 3435 3436 3437 3438
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
3439 3440 3441 3442 3443 3444
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
  
S
slzhou 已提交
3445 3446 3447 3448 3449 3450 3451 3452
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3453 3454 3455
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3456
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3457 3458
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3459
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3460 3461
  taosMemoryFreeClear(param);
}