scanoperator.c 131.0 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
36
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
37
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
38
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
39

H
Haojun Liao 已提交
40 41 42 43 44 45 46 47 48 49
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
50 51
  bool           multiReader;
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
52 53
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
54
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
55

H
Haojun Liao 已提交
56
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
57 58 59 60 61 62 63 64 65 66 67 68
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

69
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
70 71 72 73 74
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

75 76 77 78 79 80 81 82 83
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
84
  // convert key to second
85 86 87 88 89 90 91
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
92
  time_t    t = (time_t)key;
93
  taosLocalTime(&t, &tm, NULL);
94 95 96 97

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
98
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
99 100 101 102

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
103
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
104 105 106 107

  tw->ekey -= 1;
}

108
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
109 110 111 112 113 114 115
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

116
  if (order == TSDB_ORDER_ASC) {
117
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
118
    ASSERT(w.ekey >= pBlockInfo->window.skey);
119

120
    if (w.ekey < pBlockInfo->window.ekey) {
121 122 123
      return true;
    }

124 125
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
126 127 128 129
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

130
      ASSERT(w.ekey > pBlockInfo->window.ekey);
131
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
132 133 134 135
        return true;
      }
    }
  } else {
136
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
137
    ASSERT(w.skey <= pBlockInfo->window.ekey);
138

139
    if (w.skey > pBlockInfo->window.skey) {
140 141 142
      return true;
    }

143
    while (1) {
144 145 146 147 148 149
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
150
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
151 152 153
        return true;
      }
    }
154 155 156 157 158
  }

  return false;
}

159 160 161 162 163 164 165 166 167 168 169
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
170 171
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
172 173 174 175 176

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
177
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
178 179 180
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
181

182 183 184 185 186 187
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
188
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
189 190 191
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
192
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
193 194

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
195
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
196 197 198 199 200 201 202 203 204

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
205
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
206 207 208 209 210 211 212 213 214

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
215
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
216 217 218 219 220 221 222 223

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
224
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
225
                               int32_t numOfRows) {
H
Haojun Liao 已提交
226
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
227 228 229
    return true;
  }

H
Haojun Liao 已提交
230
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
231 232 233
  return keep;
}

H
Haojun Liao 已提交
234
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
235
  bool    allColumnsHaveAgg = true;
236
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
237
  if (code != TSDB_CODE_SUCCESS) {
238
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
239 240 241 242 243 244 245 246
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
247
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
248
                               int32_t rows) {
H
Haojun Liao 已提交
249 250 251
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

252
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
253
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
254
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
255
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
256 257
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
258 259 260

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
261 262 263
  }
}

264
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
265
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
266
  const char* id = GET_TASKID(pTaskInfo);
267

268
  if (pLimitInfo->remainOffset > 0) {
269 270
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
271
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
272
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
273
      return false;
274
    } else {
275
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
276 277 278 279 280 281
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
282
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
283
    blockDataKeepFirstNRows(pBlock, keep);
284 285

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
286
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
287
    return true;
288
  }
289

290
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
291
  return false;
292 293
}

H
Haojun Liao 已提交
294
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
295
                             uint32_t* status) {
S
slzhou 已提交
296
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
297
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
298 299

  pCost->totalBlocks += 1;
300
  pCost->totalRows += pBlock->info.rows;
301

H
Haojun Liao 已提交
302
  bool loadSMA = false;
H
Haojun Liao 已提交
303
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
304
  if (pOperator->exprSupp.pFilterInfo != NULL ||
305
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
306 307 308 309
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
310
  taosMemoryFreeClear(pBlock->pBlockAgg);
311 312

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
313
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
314
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
315
    pCost->filterOutBlocks += 1;
316
    pCost->totalRows += pBlock->info.rows;
317
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
318 319
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
320 321 322
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
323
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
324
    pCost->skipBlocks += 1;
325
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
326
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
327
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
328
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
329
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
330
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
331
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
332
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
333
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
334
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
335
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
336 337
      return TSDB_CODE_SUCCESS;
    } else {
338
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
339
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
340
    }
H
Haojun Liao 已提交
341
  }
342

H
Haojun Liao 已提交
343
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
344

H
Haojun Liao 已提交
345
  // try to filter data block according to sma info
H
Haojun Liao 已提交
346
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
347 348 349
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
350
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
351
      if (!keep) {
X
Xiaoyu Wang 已提交
352 353
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
354 355 356
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

357
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
358 359
        return TSDB_CODE_SUCCESS;
      }
360
    }
H
Haojun Liao 已提交
361
  }
362

363 364 365
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

366
  // try to filter data block according to current results
367 368
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
369 370
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
371
    pCost->skipBlocks += 1;
372
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
373
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
374 375 376
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
377 378
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
379

H
Haojun Liao 已提交
380 381
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
382
    return terrno;
H
Haojun Liao 已提交
383 384
  }

H
Haojun Liao 已提交
385
  ASSERT(p == pBlock);
386
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
387

H
Haojun Liao 已提交
388 389
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
390

H
Haojun Liao 已提交
391
  if (pOperator->exprSupp.pFilterInfo != NULL) {
392
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
393
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
394

395 396
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
397

398 399
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
400
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
401 402 403 404
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
405 406
  }

407
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
408
  if (limitReached) {  // set operator flag is done
409 410
    setOperatorCompleted(pOperator);
  }
411

H
Haojun Liao 已提交
412
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
413 414 415
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
416
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
417 418 419
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
420
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
421 422
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
423 424
}

425 426
typedef struct STableCachedVal {
  const char* pName;
427
  STag*       pTags;
428 429
} STableCachedVal;

430 431 432 433 434 435 436 437 438 439 440
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
441 442
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
443
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
444 445 446 447
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
448
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
449 450 451 452 453 454 455
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

456 457
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
458

459 460 461 462 463
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
464
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
465 466 467
  }
}

468 469
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
470
  // currently only the tbname pseudo column
471
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
472
    return TSDB_CODE_SUCCESS;
473 474
  }

475 476
  int32_t code = 0;

477 478 479 480
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

481
  bool            freeReader = false;
482
  STableCachedVal val = {0};
483 484

  SMetaReader mr = {0};
485
  LRUHandle*  h = NULL;
486

487 488 489
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

490
  // 1. check if it is existed in meta cache
491
  if (pCache == NULL) {
492
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
493
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
494
    if (code != TSDB_CODE_SUCCESS) {
495
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
496
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
497 498
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
499 500 501

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
502
      } else {
S
slzhou 已提交
503 504
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
505
      }
506 507 508 509 510
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
511

512 513
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
514 515

    freeReader = true;
516
  } else {
517 518
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
519
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
520 521
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
522
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
523
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
524
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
525
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
526
                pBlock->info.id.uid, tstrerror(terrno), idStr);
527 528
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
529
        } else {
H
Haojun Liao 已提交
530
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
531
                 idStr);
H
Haojun Liao 已提交
532
        }
533 534 535 536 537 538
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
539
      STableCachedVal* pVal = createTableCacheVal(&mr);
540

H
Haojun Liao 已提交
541
      val = *pVal;
542
      freeReader = true;
H
Haojun Liao 已提交
543

H
Haojun Liao 已提交
544
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
545
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
546 547 548 549 550 551 552 553
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
554

H
Haojun Liao 已提交
555
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
556
    }
H
Haojun Liao 已提交
557

558 559
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
560
  }
561

562 563
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
564
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
565 566

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
567
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
568

569
    int32_t functionId = pExpr1->pExpr->_function.functionId;
570 571 572

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
573
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
574
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
575
      STagVal tagVal = {0};
576 577
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
578

579 580 581 582
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
583
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
584
      }
585

H
Haojun Liao 已提交
586 587
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
588
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
589
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
590
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
591 592 593
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
594 595 596 597 598 599
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
600
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
601
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
602
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
603
        }
604 605 606 607
      }
    }
  }

608 609
  // restore the rows
  pBlock->info.rows = backupRows;
610 611 612 613
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
614
  return TSDB_CODE_SUCCESS;
615 616
}

H
Haojun Liao 已提交
617
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
618 619 620
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
621
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
622
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
623 624 625
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
626

H
Haojun Liao 已提交
627
  colInfoDataEnsureCapacity(&infoData, 1, false);
628
  colDataSetVal(&infoData, 0, buf, false);
629

H
Haojun Liao 已提交
630
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
631
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
632 633 634 635 636 637 638

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
639
  colDataDestroy(&infoData);
640 641
}

642
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
643
  STableScanInfo* pTableScanInfo = pOperator->info;
644
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
645
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
646 647
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
648

649 650
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
651 652 653 654 655 656 657 658 659 660
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
661

662
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
663
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
664
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
665
    }
H
Haojun Liao 已提交
666

667
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
668
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
669 670 671
      break;
    }

672 673 674 675 676 677
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
678
    if (pBlock->info.id.uid) {
679
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
680
    }
681

682
    uint32_t status = 0;
H
Haojun Liao 已提交
683
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
684
    if (code != TSDB_CODE_SUCCESS) {
685
      T_LONG_JMP(pTaskInfo->env, code);
686
    }
687

688 689 690
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
691
    }
692

H
Haojun Liao 已提交
693 694
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
695

H
Haojun Liao 已提交
696
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
697 698

    // todo refactor
H
Haojun Liao 已提交
699
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
700 701
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
702
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
703
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
704

705
    return pBlock;
H
Haojun Liao 已提交
706 707 708 709
  }
  return NULL;
}

H
Haojun Liao 已提交
710
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
711 712 713 714
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
715
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
716 717 718
    return NULL;
  }

719 720
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
721 722 723
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
724 725
    }

726
    pTableScanInfo->scanTimes += 1;
727

728
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
729
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
730 731
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
732
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
733

734
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
735
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
736
    }
737
  }
H
Haojun Liao 已提交
738

739
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
740
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
741 742 743
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
744
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
745
    }
H
Haojun Liao 已提交
746

747
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
748 749 750
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
751
      }
H
Haojun Liao 已提交
752

753
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
754

755
      if (pTableScanInfo->scanTimes < total) {
756
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
757
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
758

759
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
760
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
761
      }
H
Haojun Liao 已提交
762 763 764
    }
  }

wmmhello's avatar
wmmhello 已提交
765 766 767 768 769 770 771
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

772
  // scan table one by one sequentially
L
Liu Jicong 已提交
773
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
774
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
775
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
776

L
Liu Jicong 已提交
777
    while (1) {
H
Haojun Liao 已提交
778
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
779
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
780 781
        return result;
      }
H
Haojun Liao 已提交
782

L
Liu Jicong 已提交
783 784
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
785 786

      taosRLockLatch(&pTaskInfo->lock);
787
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
788

H
Haojun Liao 已提交
789
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
790
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
791
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
792 793
        return NULL;
      }
H
Haojun Liao 已提交
794

X
Xiaoyu Wang 已提交
795
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
796 797 798 799
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
800
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
801

H
Haojun Liao 已提交
802
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
803 804
      pInfo->scanTimes = 0;
    }
805 806
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
807
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
808
        setOperatorCompleted(pOperator);
809 810
        return NULL;
      }
811

5
54liuyao 已提交
812
      int32_t        num = 0;
813
      STableKeyInfo* pList = NULL;
814
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
815
      ASSERT(pInfo->base.dataReader == NULL);
816

L
Liu Jicong 已提交
817
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
818
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
819 820 821
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
822 823 824 825

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
826
    }
H
Haojun Liao 已提交
827

H
Haojun Liao 已提交
828
    SSDataBlock* result = doGroupedTableScan(pOperator);
829 830 831
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
832

833
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
834
      setOperatorCompleted(pOperator);
835 836
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
837

838 839
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
840
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
841

5
54liuyao 已提交
842
    int32_t        num = 0;
843
    STableKeyInfo* pList = NULL;
844
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
845

H
Haojun Liao 已提交
846 847
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
848
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
849

H
Haojun Liao 已提交
850
    result = doGroupedTableScan(pOperator);
851 852 853
    if (result != NULL) {
      return result;
    }
854

H
Haojun Liao 已提交
855
    setOperatorCompleted(pOperator);
856 857
    return NULL;
  }
H
Haojun Liao 已提交
858 859
}

860 861
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
862
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
863
  *pRecorder = pTableScanInfo->base.readRecorder;
864 865 866 867 868
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

869 870
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
871

872 873
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
874

875 876
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
877
  }
L
Liu Jicong 已提交
878

879
  tableListDestroy(pBase->pTableListInfo);
880 881 882 883 884 885 886 887
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
888
  taosMemoryFreeClear(param);
889 890
}

891
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
892
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
893 894 895
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
896
    goto _error;
H
Haojun Liao 已提交
897 898
  }

899
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
900
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
901 902

  int32_t numOfCols = 0;
903
  int32_t code =
H
Haojun Liao 已提交
904
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
905 906 907 908
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
909
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
910
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
911
  if (code != TSDB_CODE_SUCCESS) {
912
    goto _error;
913 914
  }

H
Haojun Liao 已提交
915
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
916
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
917
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
918
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
919 920
  }

921
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
922
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
923

H
Haojun Liao 已提交
924 925
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
926 927
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

928 929
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
930

H
Haojun Liao 已提交
931
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
932
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
933
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
934

H
Haojun Liao 已提交
935 936 937
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
938 939
  }

wmmhello's avatar
wmmhello 已提交
940
  pInfo->currentGroupId = -1;
941
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
942
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
943

L
Liu Jicong 已提交
944 945
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
946
  pOperator->exprSupp.numOfExprs = numOfCols;
947

948
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
949 950
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
951 952 953
    code = terrno;
    goto _error;
  }
954

D
dapan1121 已提交
955 956 957 958
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
959
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
960 961
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
962 963 964

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
965
  return pOperator;
966

967
_error:
968 969 970
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
971

972 973
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
974
  return NULL;
H
Haojun Liao 已提交
975 976
}

977
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
978
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
979
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
980

H
Haojun Liao 已提交
981
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
982
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
983

L
Liu Jicong 已提交
984 985
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
986
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
987 988 989
  return pOperator;
}

990
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
991
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
992 993
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
994 995
}

996
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
997
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
998 999
}

1000
static bool isStateWindow(SStreamScanInfo* pInfo) {
1001
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1002
}
5
54liuyao 已提交
1003

L
Liu Jicong 已提交
1004
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1005 1006 1007
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1008 1009 1010
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1011
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1012 1013
}

1014 1015 1016 1017
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1018
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1019 1020
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1021
  ASSERT(rowIndex < pBlock->info.rows);
1022
  pInfo->groupId = groupCol[rowIndex];
1023 1024
}

L
fix bug  
liuyao 已提交
1025
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) {
H
Haojun Liao 已提交
1026
  pTableScanInfo->base.cond.twindows = *pWin;
L
fix bug  
liuyao 已提交
1027
  pTableScanInfo->base.cond.endVersion = version;
L
Liu Jicong 已提交
1028 1029
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1030
  tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1031
  qDebug("1");
H
Haojun Liao 已提交
1032
  pTableScanInfo->base.dataReader = NULL;
1033 1034
}

L
Liu Jicong 已提交
1035 1036
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1037
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1038

1039
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1040
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1041 1042 1043 1044 1045 1046 1047 1048 1049

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1050
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1051
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1052 1053
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1054
    T_LONG_JMP(pTaskInfo->env, code);
1055 1056 1057
    return NULL;
  }

D
dapan1121 已提交
1058 1059
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
1060 1061
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1062
    T_LONG_JMP(pTaskInfo->env, code);
1063 1064 1065
    return NULL;
  }

D
dapan1121 已提交
1066
  if (hasNext) {
L
Liu Jicong 已提交
1067
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1068
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1069
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1070 1071 1072
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1073
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1074 1075
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1076 1077

  return pBlock->info.rows > 0 ? pBlock : NULL;
1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1089
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1090
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1091
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1092 1093
}

5
54liuyao 已提交
1094 1095 1096 1097 1098 1099 1100 1101
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1102
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1103 1104 1105
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1116 1117 1118
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1119 1120 1121 1122 1123 1124

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1125
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1126 1127 1128 1129
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1130 1131 1132
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1133
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1134 1135 1136
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1137
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1138 1139 1140
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1141 1142
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1143 1144 1145
    break;
  }

L
fix bug  
liuyao 已提交
1146
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1147
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1148 1149 1150
  return true;
}

5
54liuyao 已提交
1151
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1152
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1153
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1154
  dumyInfo.cur.pageId = -1;
1155
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1156 1157
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1158
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1159

5
54liuyao 已提交
1160
  while (1) {
1161 1162 1163
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1164
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1165 1166 1167 1168 1169
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1170
    }
5
54liuyao 已提交
1171

5
54liuyao 已提交
1172 1173 1174
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1175
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1176
    endWin = preWin;
5
54liuyao 已提交
1177
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1178 1179 1180 1181 1182 1183
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1184

L
Liu Jicong 已提交
1185
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1186
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1187
  bool prepareRes = true;
L
Liu Jicong 已提交
1188 1189 1190
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1191 1192
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1193 1194 1195 1196
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1197 1198 1199
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1200 1201
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1202
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1203
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1204
      tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1205
      qDebug("2");
H
Haojun Liao 已提交
1206
      pTableScanInfo->base.dataReader = NULL;
1207 1208
      return NULL;
    }
L
Liu Jicong 已提交
1209

H
Haojun Liao 已提交
1210
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1211 1212 1213 1214
    if (pResult->info.rows == 0) {
      continue;
    }

1215 1216 1217 1218 1219 1220 1221 1222
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1223 1224
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1225
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1226 1227 1228 1229
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1230 1231 1232

      blockDataDestroy(tmpBlock);

1233 1234 1235 1236
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1237
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1238
      pResult->info.calWin = pInfo->updateWin;
1239
      return pResult;
5
54liuyao 已提交
1240 1241
    }
  }
1242
}
1243

1244
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1245
                                   SSessionKey* pKey) {
1246 1247 1248
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1249

1250 1251 1252 1253 1254
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1255 1256

  taosMemoryFree(pCur);
1257 1258 1259
  return code;
}

1260
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1261
  blockDataCleanup(pDestBlock);
1262 1263
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1264
  }
1265
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1266
  if (code != TSDB_CODE_SUCCESS) {
1267
    return code;
L
Liu Jicong 已提交
1268
  }
1269 1270
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1271
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1272
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1273
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1274 1275
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1276

1277 1278
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1279
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1280
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1281 1282
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1283
  int64_t          version = pSrcBlock->info.version - 1;
1284
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1285
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1286
    // gap must be 0.
5
54liuyao 已提交
1287
    SSessionKey startWin = {0};
1288
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1289
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1290 1291 1292
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1293 1294
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1295
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1296 1297 1298 1299
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1300
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1301 1302
      continue;
    }
1303 1304
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1305

1306
    colDataSetNULL(pDestUidCol, i);
1307
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1308 1309
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1310
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1311
  }
1312
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1313
}
1314 1315 1316 1317 1318 1319

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1320
  }
1321

1322 1323
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1324 1325
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1326

L
Liu Jicong 已提交
1327
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1328
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1365 1366
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1367
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1368 1369 1370
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1371
  for (int32_t i = 0; i < rows;) {
1372
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1373 1374 1375 1376 1377
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1378
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1379
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1380 1381
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1382 1383 1384 1385 1386
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1387
    pDestBlock->info.rows++;
5
54liuyao 已提交
1388
  }
1389 1390
  return TSDB_CODE_SUCCESS;
}
1391

1392
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1393 1394 1395
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1396 1397
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1398
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1399 1400 1401 1402
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1413
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1414 1415
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1416
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1417 1418 1419
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1420
    if (pInfo->tbnameCalSup.pExprInfo) {
1421 1422 1423
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1424 1425
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
dengyihao's avatar
dengyihao 已提交
1426
      streamFreeVal(parTbname);
L
Liu Jicong 已提交
1427 1428 1429
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1430 1431 1432 1433
  }
  return TSDB_CODE_SUCCESS;
}

1434 1435 1436 1437
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1438
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1439
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1440 1441
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1442
  }
1443
  pDestBlock->info.type = STREAM_CLEAR;
1444
  pDestBlock->info.version = pSrcBlock->info.version;
1445
  pDestBlock->info.dataLoad = 1;
1446 1447 1448 1449
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1450
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1451
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1452 1453
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1454
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1455
  } else {
5
54liuyao 已提交
1456 1457
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1458
  }
L
Liu Jicong 已提交
1459 1460
}

1461 1462
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1463 1464
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1465 1466
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1467 1468
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1469
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1470 1471 1472 1473 1474 1475 1476
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1477
  pBlock->info.rows++;
5
54liuyao 已提交
1478 1479
}

1480
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1481 1482
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1483
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1484
  }
1485 1486
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1487
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1488
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1489
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1490 1491
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1492
    bool        isClosed = false;
5
54liuyao 已提交
1493
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1494
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1495 1496 1497 1498 1499
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1500 1501 1502
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1503
    // must check update info first.
H
Haojun Liao 已提交
1504
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1505
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1506
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
L
liuyao 已提交
1507
                                           pInfo->pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1508
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1509
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1510
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1511
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1512
                                       NULL);
5
54liuyao 已提交
1513 1514
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1515 1516
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1517
      }
1518 1519
    }
  }
1520 1521
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1522
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1523
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1524
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1525 1526
  }
}
L
Liu Jicong 已提交
1527

1528
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1529 1530
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1531
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1532

1533 1534
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1535
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1536
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1537
  pInfo->pRes->info.type = STREAM_NORMAL;
1538
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1539

1540
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1541
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1542 1543

  // todo extract method
H
Haojun Liao 已提交
1544 1545 1546
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1547 1548 1549 1550 1551 1552 1553
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1554
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1555
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1556 1557 1558 1559 1560 1561 1562
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1563
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1564
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1565 1566 1567 1568 1569
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1570
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1571
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1572 1573
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1574
      blockDataFreeRes((SSDataBlock*)pBlock);
1575
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1576
    }
K
kailixu 已提交
1577 1578 1579

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1580 1581
  }

1582
  if (filter) {
H
Haojun Liao 已提交
1583
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1584
  }
1585

1586
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1587
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1588
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1589

L
Liu Jicong 已提交
1590
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1591 1592
  return 0;
}
5
54liuyao 已提交
1593

L
Liu Jicong 已提交
1594
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1595 1596
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1597
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1598

1599
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1600

L
Liu Jicong 已提交
1601
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
L
Liu Jicong 已提交
1602
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1603
      SPackedData submit = pTaskInfo->streamInfo.submit;
L
Liu Jicong 已提交
1604
      if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
1605
        qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id);
L
Liu Jicong 已提交
1606
        pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1607
        pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1608 1609 1610 1611 1612 1613 1614
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

L
Liu Jicong 已提交
1615
    while (tqNextDataBlock2(pInfo->tqReader)) {
L
Liu Jicong 已提交
1616 1617
      SSDataBlock block = {0};

1618
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1619 1620 1621 1622
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1623
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1624 1625 1626 1627 1628 1629

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1630
    pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1631
    pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1632
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1633
    return NULL;
L
Liu Jicong 已提交
1634 1635
  }

L
Liu Jicong 已提交
1636 1637 1638
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
X
Xiaoyu Wang 已提交
1639
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64 " %s",
dengyihao's avatar
dengyihao 已提交
1640
             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
X
Xiaoyu Wang 已提交
1641
             pInfo->tqReader->pWalReader->curVersion, id);
1642
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1643 1644
      return pResult;
    } else {
1645
      // no data has return already, try to extract data in the WAL
1646 1647
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1648 1649
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1650
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1651 1652

        qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id);
H
Haojun Liao 已提交
1653
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1654
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1655 1656 1657
          return NULL;
        }
      } else {
L
Liu Jicong 已提交
1658 1659
        return NULL;
      }
1660 1661 1662
    }
  }

L
Liu Jicong 已提交
1663 1664 1665
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
H
Haojun Liao 已提交
1666 1667
      terrno = 0;

1668
      if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
1669 1670
        // if the end is reached, terrno is 0
        if (terrno != 0) {
1671
          qError("failed to get next log block since %s, %s", terrstr(), id);
1672
        }
1673
      }
1674

L
Liu Jicong 已提交
1675 1676
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1677
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1678
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1679
          pOperator->status = OP_EXEC_RECV;
D
dapan1121 已提交
1680
          qDebug("queue scan log return %" PRId64 " rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1681 1682 1683
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
1684
        qError("unexpected ret.fetchType:%d", ret.fetchType);
1685
        continue;
L
Liu Jicong 已提交
1686 1687
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1688
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1689 1690
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1691
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1692
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1693 1694 1695
        return NULL;
      }
    }
L
Liu Jicong 已提交
1696
  } else {
1697
    qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id);
L
Liu Jicong 已提交
1698
    return NULL;
H
Haojun Liao 已提交
1699
  }
L
Liu Jicong 已提交
1700 1701
}

L
Liu Jicong 已提交
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1720 1721 1722
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1723

1724 1725 1726
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1727 1728 1729
      j++;
    }
  }
L
Liu Jicong 已提交
1730
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1731 1732
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1733
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1734 1735 1736 1737

  return 0;
}

5
54liuyao 已提交
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1750
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1751 1752 1753 1754
    }
  }
}

5
54liuyao 已提交
1755
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1756
  if (pInfo->pUpdateInfo) {
L
fix bug  
liuyao 已提交
1757
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1758
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1770 1771 1772 1773
    }
  }
}

L
liuyao 已提交
1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
  *pBuff = taosMemoryCalloc(1, len);
  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
  return len;
}

// other properties are recovered from the execution plan
void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
  if (!pBuff) {
    return;
  }

1787 1788
  SUpdateInfo* pUpInfo = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
  int32_t      code = updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1789 1790 1791 1792 1793
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1794 1795 1796 1797 1798
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1799
  qDebug("stream scan called");
H
Haojun Liao 已提交
1800

1801 1802
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1803
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1804
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1805
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1806 1807 1808 1809
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1810
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1811
    } else {
H
Haojun Liao 已提交
1812 1813 1814 1815
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1816
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1817
    }
L
Liu Jicong 已提交
1818

H
Haojun Liao 已提交
1819
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1820 1821
    qDebug("4");

H
Haojun Liao 已提交
1822
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1823
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1824

L
Liu Jicong 已提交
1825 1826
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1827
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1828 1829
  }

5
54liuyao 已提交
1830 1831
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1832 1833 1834 1835 1836
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1837 1838 1839 1840 1841 1842 1843

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1844 1845 1846 1847
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1848
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1849 1850
        return pInfo->pUpdateRes;
      } break;
1851 1852 1853 1854 1855 1856 1857 1858 1859
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1860 1861 1862 1863 1864 1865
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1866
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1867 1868 1869 1870 1871 1872
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1873 1874 1875 1876 1877 1878
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1879
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1880
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1881
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1882 1883 1884 1885 1886 1887
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1888
      }
5
54liuyao 已提交
1889 1890
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1891
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1892 1893
        return pInfo->pCreateTbRes;
      }
X
Xiaoyu Wang 已提交
1894
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1895 1896
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1897 1898
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1899
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1900
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1901 1902
    qDebug("5");

H
Haojun Liao 已提交
1903
    pTSInfo->base.dataReader = NULL;
1904

H
Haojun Liao 已提交
1905 1906
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1907

L
Liu Jicong 已提交
1908
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1909 1910 1911
    return NULL;
  }

5
54liuyao 已提交
1912
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1913
// TODO: refactor
L
Liu Jicong 已提交
1914
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1915
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1916
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1917
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1918
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1919 1920 1921
      return NULL;
    }

1922
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1923 1924
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1925
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1926
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1927
    }
1928
    // TODO move into scan
5
54liuyao 已提交
1929 1930
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1931
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
1932 1933 1934
    if (pInfo->pUpdateInfo) {
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
    }
1935
    blockDataUpdateTsWindow(pBlock, 0);
1936
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1937 1938 1939
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1940 1941 1942
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1943
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
1944
        pInfo->updateResIndex = 0;
1945
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1946 1947 1948
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1949
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1950
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1951
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1952
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1953
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1954 1955
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1956
        }
5
54liuyao 已提交
1957 1958
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1959 1960 1961 1962 1963 1964
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1965
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1966
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1967
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1968
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1969 1970
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1971 1972 1973 1974 1975
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1976 1977 1978
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1979
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1980 1981 1982
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1983 1984 1985 1986
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1987
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1988
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1989 1990 1991 1992
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1993
        }
1994 1995 1996
      } break;
      default:
        break;
5
54liuyao 已提交
1997
    }
1998
    // printDataBlock(pBlock, "stream scan recv");
1999
    return pBlock;
L
Liu Jicong 已提交
2000
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
2001
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
2002 2003 2004
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
2005
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
2006 2007 2008
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2009 2010 2011
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2012
      } break;
2013
      case STREAM_SCAN_FROM_DELETE_DATA: {
2014 2015 2016 2017 2018 2019 2020
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2021 2022 2023 2024 2025 2026 2027 2028 2029 2030
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2031
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2032 2033
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2034
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2035
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2036 2037
          return pSDB;
        }
2038
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2039 2040 2041 2042
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2043
    }
2044

2045
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2046
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2047 2048
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2049 2050
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2051
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2052
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2053
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2054
    }
5
54liuyao 已提交
2055

H
Haojun Liao 已提交
2056 2057
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2058
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2059

L
Liu Jicong 已提交
2060
  NEXT_SUBMIT_BLK:
2061
    while (1) {
L
Liu Jicong 已提交
2062
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2063
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2064
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2065
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2066
          qDebug("stream scan return empty, consume block %d", totBlockNum);
L
liuyao 已提交
2067 2068
          // void* buff = NULL;
          // int32_t len = streamScanOperatorEncode(pInfo, &buff);
2069
          // todo(liuyao) save buff
L
liuyao 已提交
2070
          // taosMemoryFreeClear(buff);
2071 2072
          return NULL;
        }
2073

L
Liu Jicong 已提交
2074 2075
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
L
Liu Jicong 已提交
2076
        /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
L
Liu Jicong 已提交
2077
        if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2078 2079 2080 2081
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2082 2083
      }

2084 2085
      blockDataCleanup(pInfo->pRes);

L
Liu Jicong 已提交
2086
      while (tqNextDataBlock2(pInfo->tqReader)) {
2087
        SSDataBlock block = {0};
2088

2089
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2090 2091 2092 2093 2094

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2095
        setBlockIntoRes(pInfo, &block, false);
2096

5
54liuyao 已提交
2097 2098 2099
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2100 2101
        }

5
54liuyao 已提交
2102
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2103
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2104
        pInfo->pRes->info.dataLoad = 1;
2105 2106 2107
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2108 2109 2110
          break;
        }
      }
2111
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2112
        break;
J
jiacy-jcy 已提交
2113
      } else {
2114
        continue;
5
54liuyao 已提交
2115
      }
H
Haojun Liao 已提交
2116 2117 2118 2119
    }

    // record the scan action.
    pInfo->numOfExec++;
2120
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
L
liuyao 已提交
2121
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2122

X
Xiaoyu Wang 已提交
2123
    qDebug("scan rows: %" PRId64, pBlockInfo->rows);
L
Liu Jicong 已提交
2124 2125 2126
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2127 2128 2129 2130 2131 2132

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2133 2134 2135
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2136 2137 2138
  }
}

H
Haojun Liao 已提交
2139
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2140 2141 2142
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2143 2144 2145
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2146 2147 2148 2149 2150 2151
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2152
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2153 2154
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2155
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2156
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2157
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2158
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2159

wmmhello's avatar
wmmhello 已提交
2160
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2161
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2162 2163 2164 2165 2166
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
2167
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2168 2169
      }
    }
X
Xiaoyu Wang 已提交
2170

D
dapan1121 已提交
2171
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2172
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2173
        tsdbReleaseDataBlock(pInfo->dataReader);
2174
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2175
      }
2176

H
Haojun Liao 已提交
2177 2178
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
2179
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2180 2181
      }

H
Haojun Liao 已提交
2182
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2183
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2184
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2185 2186 2187
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2188 2189

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2190
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2191 2192
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2193 2194
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2195
    } else {
wmmhello's avatar
wmmhello 已提交
2196 2197
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2198
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2199 2200
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2201
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2202
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2203
    return NULL;
L
Liu Jicong 已提交
2204 2205 2206 2207 2208 2209 2210
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2211
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2212
      taosMemoryFreeClear(data);
2213 2214 2215
      return NULL;
    }

L
Liu Jicong 已提交
2216
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2217 2218 2219 2220
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2221
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2222
    } else {
wmmhello's avatar
wmmhello 已提交
2223 2224 2225 2226 2227 2228 2229
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2230

wmmhello's avatar
wmmhello 已提交
2231
    return NULL;
2232
  }
L
Liu Jicong 已提交
2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2271 2272 2273
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2274
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2275 2276 2277
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
2278
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2279 2280 2281
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2282 2283 2284
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2285
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2286 2287 2288 2289 2290
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2291 2292
  int32_t code = TSDB_CODE_SUCCESS;

2293
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2294
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2295
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2296 2297
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2298 2299
  }

2300
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2301 2302
  pInfo->vnode = pHandle->vnode;

2303
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2304 2305
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2306

2307
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2308
  return pOperator;
H
Haojun Liao 已提交
2309

L
Liu Jicong 已提交
2310
_end:
H
Haojun Liao 已提交
2311 2312 2313 2314
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2315 2316
}

2317
static void destroyStreamScanOperatorInfo(void* param) {
2318
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2319

2320
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2321
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2322
  }
2323

2324 2325 2326
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2327 2328
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2329
  }
C
Cary Xu 已提交
2330 2331
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2332
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2333
  }
C
Cary Xu 已提交
2334

L
Liu Jicong 已提交
2335
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2336
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2337

L
Liu Jicong 已提交
2338
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2339 2340 2341 2342
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2343
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2344
  blockDataDestroy(pStreamScan->pCreateTbRes);
2345 2346 2347 2348
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2349
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2350
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2351
  SArray*          pColIds = NULL;
2352 2353
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2354

H
Haojun Liao 已提交
2355
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2356
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2357
    tableListDestroy(pTableListInfo);
2358
    goto _error;
H
Haojun Liao 已提交
2359 2360
  }

2361
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2362
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2363

2364
  pInfo->pTagCond = pTagCond;
2365
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2366

2367
  int32_t numOfCols = 0;
2368 2369
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2370
  if (code != TSDB_CODE_SUCCESS) {
2371
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2372 2373
    goto _error;
  }
2374

H
Haojun Liao 已提交
2375
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2376
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2377
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2378
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2379 2380

    int16_t colId = id->colId;
2381
    taosArrayPush(pColIds, &colId);
2382
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2383
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2384
    }
H
Haojun Liao 已提交
2385 2386
  }

L
Liu Jicong 已提交
2387 2388 2389 2390
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2391
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2392 2393
      goto _error;
    }
2394

L
Liu Jicong 已提交
2395 2396 2397
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2398
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2399 2400 2401 2402
      goto _error;
    }
  }

2403 2404
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2405
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2406 2407
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2408
      tableListDestroy(pTableListInfo);
2409 2410 2411 2412
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2413
      tableListDestroy(pTableListInfo);
2414 2415 2416 2417
      goto _error;
    }
  }

L
Liu Jicong 已提交
2418
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2419
  if (pInfo->pBlockLists == NULL) {
2420
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2421
    tableListDestroy(pTableListInfo);
2422
    goto _error;
H
Haojun Liao 已提交
2423 2424
  }

5
54liuyao 已提交
2425
  if (pHandle->vnode) {
2426
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2427
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2428
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2429
      pTSInfo->base.cond.endVersion = pHandle->version;
2430
    }
L
Liu Jicong 已提交
2431

2432
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2433
    int32_t        num = 0;
2434
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2435

2436
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2437
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2438
      pTSInfo->base.dataReader = NULL;
2439
      pTaskInfo->streamInfo.lastStatus.uid = -1;
L
Liu Jicong 已提交
2440 2441
    }

L
Liu Jicong 已提交
2442 2443 2444 2445
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2446
    } else {
L
Liu Jicong 已提交
2447 2448
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2449 2450
    }

2451
    pInfo->pUpdateInfo = NULL;
2452
    pInfo->pTableScanOp = pTableScanOp;
2453 2454 2455
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2456

L
Liu Jicong 已提交
2457 2458
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2459
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2460 2461
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2462

L
Liu Jicong 已提交
2463
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2464
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2465
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2466
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2467 2468 2469 2470
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2471

L
Liu Jicong 已提交
2472
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2473
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2474 2475
  } else {
    taosArrayDestroy(pColIds);
2476
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2477
    pColIds = NULL;
5
54liuyao 已提交
2478 2479
  }

2480 2481 2482 2483 2484
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2485 2486 2487 2488 2489
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2490
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2491
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2492
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2493
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2494
  pInfo->groupId = 0;
2495
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2496
  pInfo->pStreamScanOp = pOperator;
2497
  pInfo->deleteDataIndex = 0;
2498
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2499
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2500
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2501
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2502
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2503 2504
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2505
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2506
  pInfo->pState = NULL;
L
Liu Jicong 已提交
2507

2508 2509
  // todo(liuyao) get buff from rocks db;
  void*   buff = NULL;
L
liuyao 已提交
2510 2511 2512
  int32_t len = 0;
  streamScanOperatorDeocde(buff, len, pInfo);

L
Liu Jicong 已提交
2513 2514
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2515
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2516

2517
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2518 2519
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2520

H
Haojun Liao 已提交
2521
  return pOperator;
2522

L
Liu Jicong 已提交
2523
_error:
H
Haojun Liao 已提交
2524 2525 2526 2527 2528 2529 2530 2531
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2532 2533
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2534 2535
}

2536
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2537 2538 2539 2540
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2541 2542 2543
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2544
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2545
  SSDataBlock*  pRes = pInfo->pRes;
2546
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2547

2548
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2549
  if (size == 0) {
H
Haojun Liao 已提交
2550 2551 2552 2553
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2554 2555 2556
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2557
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2558

wmmhello's avatar
wmmhello 已提交
2559
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2560
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
L
Liu Jicong 已提交
2561
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2562
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2563
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2564 2565
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2566
      metaReaderClear(&mr);
2567
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2568
    }
H
Haojun Liao 已提交
2569

2570
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2571 2572 2573 2574 2575
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2576
        colDataSetVal(pDst, count, str, false);
2577
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2578 2579
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2580
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2581

2582 2583 2584 2585
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2586 2587
          data = (char*)p;
        }
2588
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2589
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2590

2591 2592
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2593
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2594
        }
H
Haojun Liao 已提交
2595 2596 2597
      }
    }

2598
    count += 1;
wmmhello's avatar
wmmhello 已提交
2599
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2600
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2601 2602 2603
    }
  }

2604 2605
  metaReaderClear(&mr);

2606
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2607
  if (pOperator->status == OP_EXEC_DONE) {
2608
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2609 2610 2611
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2612
  pOperator->resultInfo.totalRows += count;
2613

2614
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2615 2616
}

2617
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2618 2619
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2620
  taosArrayDestroy(pInfo->matchInfo.pList);
2621
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2622
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2623 2624
}

S
slzhou 已提交
2625
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2626
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2627
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2628 2629 2630 2631 2632
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2633 2634 2635 2636
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2637
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2638 2639 2640
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2641

H
Haojun Liao 已提交
2642 2643
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2644 2645 2646
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2647

2648
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2649
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2650 2651
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2652

L
Liu Jicong 已提交
2653 2654
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2655
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2656 2657
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2658 2659
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2660 2661

  return pOperator;
2662

2663
_error:
H
Haojun Liao 已提交
2664 2665 2666 2667 2668
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2669

dengyihao's avatar
dengyihao 已提交
2670
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2671 2672 2673 2674 2675 2676
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2677
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2678

H
Haojun Liao 已提交
2679
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2680

L
Liu Jicong 已提交
2681
  int64_t      st = taosGetTimestampUs();
2682
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2683
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2684

D
dapan1121 已提交
2685
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2686
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
D
dapan1121 已提交
2687 2688 2689
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2690
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2691

D
dapan1121 已提交
2692
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2693
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2694
  bool         hasNext = false;
2695
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2708

H
Haojun Liao 已提交
2709
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2710
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2711
      pInfo->base.dataReader = NULL;
2712
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2713 2714 2715
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2716
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2717 2718 2719 2720
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2721
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2722 2723 2724 2725
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2726 2727

    uint32_t status = 0;
2728
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2729
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2730
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2731
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2732 2733 2734 2735 2736 2737 2738
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2739
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2740

H
Haojun Liao 已提交
2741
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2742
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2743

2744
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2745 2746 2747 2748
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2749
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2750 2751
    return pBlock;
  }
H
Haojun Liao 已提交
2752

D
dapan1121 已提交
2753 2754 2755 2756
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2757
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2758 2759 2760
  return NULL;
}

2761 2762 2763
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2764
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2765
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2766
      tsTargetSlotId = colInfo->dstSlotId;
2767 2768 2769
    }
  }

2770 2771 2772
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2773
  bi.slotId = tsTargetSlotId;
2774 2775 2776 2777 2778 2779 2780
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2781
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2782 2783 2784 2785 2786 2787 2788
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2789

2790
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2791 2792 2793
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2794
  {
2795
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2796
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2797
    for (; i < numOfTables; ++i) {
2798
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2799 2800 2801 2802 2803 2804
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2805

S
slzhou 已提交
2806 2807
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2808

H
Haojun Liao 已提交
2809
  pInfo->base.dataReader = NULL;
2810

2811 2812
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2813
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2814
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2815 2816
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2817

dengyihao's avatar
dengyihao 已提交
2818
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2819 2820 2821 2822 2823 2824

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2825 2826 2827
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2828
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2829
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2830 2831
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2832
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2833 2834

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2835
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2836
    taosArrayPush(pInfo->queryConds, &cond);
2837 2838
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2839
  for (int32_t i = 0; i < numOfTable; ++i) {
2840
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2841
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2842
    ps->param = param;
2843
    ps->onlyRef = true;
2844 2845 2846 2847 2848 2849
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2850
    T_LONG_JMP(pTaskInfo->env, terrno);
2851 2852
  }

2853 2854 2855 2856 2857 2858 2859
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2860
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2861

2862 2863 2864 2865 2866 2867 2868
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2869
  for (int32_t i = 0; i < numOfTable; ++i) {
2870 2871
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
D
dapan1121 已提交
2872 2873
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2874
  }
2875 2876
  taosArrayClear(pInfo->sortSourceParams);

2877
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2878
  pInfo->pSortHandle = NULL;
2879

dengyihao's avatar
opt mem  
dengyihao 已提交
2880 2881 2882
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2883
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2884 2885 2886
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2887
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2888 2889 2890
  return TSDB_CODE_SUCCESS;
}

2891 2892
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2893 2894
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2895 2896 2897
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2898
  blockDataCleanup(pResBlock);
2899 2900

  while (1) {
2901
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2902 2903 2904 2905
    if (pTupleHandle == NULL) {
      break;
    }

2906 2907
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2908 2909 2910 2911
      break;
    }
  }

2912
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2913
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2914
         pInfo->limitInfo.numOfOutputRows);
2915

2916
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2929
    T_LONG_JMP(pTaskInfo->env, code);
2930
  }
2931

2932
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2933 2934
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2935

S
slzhou 已提交
2936
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2937
      setOperatorCompleted(pOperator);
2938 2939
      return NULL;
    }
S
slzhou 已提交
2940
    pInfo->tableStartIndex = 0;
2941
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2942 2943
    startGroupTableMergeScan(pOperator);
  }
2944

S
slzhou 已提交
2945 2946
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2947 2948 2949 2950
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2951 2952
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2953
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2954
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2955 2956 2957
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2958
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2959 2960
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2961
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2962 2963
        break;
      }
2964

S
slzhou 已提交
2965
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2966
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2967
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2968
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2969
    }
wmmhello's avatar
wmmhello 已提交
2970 2971
  }

2972 2973 2974
  return pBlock;
}

2975
void destroyTableMergeScanOperatorInfo(void* param) {
2976
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2977
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2978

dengyihao's avatar
dengyihao 已提交
2979 2980 2981
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2982 2983
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
D
dapan1121 已提交
2984 2985
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2986
  }
H
Haojun Liao 已提交
2987

D
dapan1121 已提交
2988 2989 2990
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;

2991
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2992 2993
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2994

dengyihao's avatar
opt mem  
dengyihao 已提交
2995 2996 2997
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2998 2999
  }

3000 3001
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
3002 3003 3004 3005 3006

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
3007
  taosMemoryFreeClear(param);
3008 3009 3010 3011
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3012 3013
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3014
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
3015
  execInfo->blockRecorder = pInfo->base.readRecorder;
3016
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3017 3018 3019

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3020

3021 3022 3023
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3024
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3025
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3026 3027 3028 3029 3030
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3031

3032 3033 3034
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3035
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3036
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3037 3038 3039
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3040

H
Haojun Liao 已提交
3041
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3042
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3043
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3044 3045 3046 3047
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3048
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3049 3050
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3051 3052 3053 3054
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3055 3056 3057 3058 3059 3060
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3061 3062
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3063
  pInfo->base.readHandle = *readHandle;
3064 3065 3066

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3067
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3068

3069
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3070
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3071 3072 3073 3074 3075 3076

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3077
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3078
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3079 3080
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3081
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3082

H
Haojun Liao 已提交
3083
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3084
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3085
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3086

dengyihao's avatar
dengyihao 已提交
3087
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3088 3089
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3090

L
Liu Jicong 已提交
3091 3092
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3093
  pOperator->exprSupp.numOfExprs = numOfCols;
3094

3095 3096
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3097 3098 3099 3100 3101 3102 3103 3104 3105
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3106 3107 3108 3109

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3110
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3111 3112 3113 3114 3115 3116
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3117 3118
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3119 3120 3121 3122 3123 3124 3125
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3187 3188
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3217

S
slzhou 已提交
3218
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3219 3220 3221
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3222
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3223
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3224
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3225 3226 3227 3228 3229 3230 3231 3232 3233

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3234
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3235 3236
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3237 3238 3239
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3240 3241 3242

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3243 3244
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3256 3257 3258
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3259
    ASSERT(strlen(dbName));
S
slzhou 已提交
3260
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3261 3262 3263 3264

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3265
    varDataSetLen(varDbName, strlen(dbName));
3266
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3267 3268 3269 3270
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3271
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3272
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3273
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3274
      varDataSetLen(varStbName, strlen(stbName));
3275
      colDataSetVal(colInfoData, 0, varStbName, false);
3276
    } else {
3277
      colDataSetNULL(colInfoData, 0);
3278
    }
S
slzhou 已提交
3279 3280 3281
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3282
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3283
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3284 3285 3286 3287
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3288
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3289 3290 3291
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3292
  size_t infodbTableNum;
S
slzhou 已提交
3293
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3294
  size_t perfdbTableNum;
S
slzhou 已提交
3295 3296
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3297
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3298
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3299 3300
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3301
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3302 3303 3304 3305
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3322 3323 3324 3325 3326 3327
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3328

S
slzhou 已提交
3329 3330 3331
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3332 3333 3334 3335 3336 3337 3338
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3339 3340 3341 3342 3343 3344 3345 3346
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3347
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3348 3349 3350 3351
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3352
  blockDataCleanup(pRes);
3353

S
slzhou 已提交
3354 3355 3356
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3357
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3358
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3359
  }
S
slzhou 已提交
3360

S
slzhou 已提交
3361 3362 3363 3364 3365
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3366 3367
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3368
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3369

S
slzhou 已提交
3370 3371 3372 3373 3374 3375
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3376
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3391
      }
S
slzhou 已提交
3392 3393 3394 3395 3396 3397 3398 3399 3400 3401
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3402
    } else {
S
slzhou 已提交
3403
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3404 3405
    }
  } else {
S
slzhou 已提交
3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3423 3424 3425
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3426
    }
S
slzhou 已提交
3427 3428 3429
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3430
  }
S
slzhou 已提交
3431 3432 3433 3434 3435 3436
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3437 3438 3439
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3440

S
slzhou 已提交
3441 3442 3443
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3444 3445 3446
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3447 3448 3449 3450 3451 3452 3453 3454
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3455 3456 3457 3458 3459
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
X
Xiaoyu Wang 已提交
3460

S
slzhou 已提交
3461 3462 3463 3464 3465 3466 3467 3468
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3469 3470 3471
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3472
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3473 3474
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3475
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3476 3477
  taosMemoryFreeClear(param);
}