scanoperator.c 128.5 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34
#define MULTI_READER_MAX_TABLE_NUM 5000
H
Haojun Liao 已提交
35
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
36
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
37

H
Haojun Liao 已提交
38 39 40 41 42 43 44 45 46
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
D
dapan1121 已提交
47 48 49
  SSDataBlock*   inputBlock;  
  bool           multiReader;
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
50 51
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
52
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
53

H
Haojun Liao 已提交
54
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
55 56 57 58 59 60 61 62 63 64 65 66
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

67
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
68 69 70 71 72
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

73 74 75 76 77 78 79 80 81
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
82
  // convert key to second
83 84 85 86 87 88 89
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
90
  time_t    t = (time_t)key;
91 92 93 94 95
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
96
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
97 98 99 100

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
101
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
102 103 104 105

  tw->ekey -= 1;
}

106
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
107 108 109 110 111 112 113
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

114
  if (order == TSDB_ORDER_ASC) {
115
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
116
    ASSERT(w.ekey >= pBlockInfo->window.skey);
117

118
    if (w.ekey < pBlockInfo->window.ekey) {
119 120 121
      return true;
    }

122 123
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
124 125 126 127
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

128
      ASSERT(w.ekey > pBlockInfo->window.ekey);
129
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
130 131 132 133
        return true;
      }
    }
  } else {
134
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
135
    ASSERT(w.skey <= pBlockInfo->window.ekey);
136

137
    if (w.skey > pBlockInfo->window.skey) {
138 139 140
      return true;
    }

141
    while (1) {
142 143 144 145 146 147
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
148
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
149 150 151
        return true;
      }
    }
152 153 154 155 156
  }

  return false;
}

157 158 159 160 161 162 163 164 165 166 167
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
168 169
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
170 171 172 173 174

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
175
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
176 177 178
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
179

180 181 182 183 184 185
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
186
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
187 188 189
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
190
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
191 192

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
193
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
194 195 196 197 198 199 200 201 202

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
203
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
204 205 206 207 208 209 210 211 212

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
213
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
214 215 216 217 218 219 220 221

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
222
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
223
                               int32_t numOfRows) {
H
Haojun Liao 已提交
224
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
225 226 227
    return true;
  }

H
Haojun Liao 已提交
228
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
229 230 231
  return keep;
}

H
Haojun Liao 已提交
232
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
233
  bool    allColumnsHaveAgg = true;
234
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
235
  if (code != TSDB_CODE_SUCCESS) {
236
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
237 238 239 240 241 242 243 244
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
245
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
246
                               int32_t rows) {
H
Haojun Liao 已提交
247 248 249
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

250
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
251
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
252
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
253
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
254 255
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
256 257 258

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
259 260 261
  }
}

262
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
263
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
264
  const char* id = GET_TASKID(pTaskInfo);
265

266
  if (pLimitInfo->remainOffset > 0) {
267 268
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
269
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
270
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
271
      return false;
272
    } else {
273
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
274 275 276 277 278 279
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
280
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
281
    blockDataKeepFirstNRows(pBlock, keep);
282 283

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
284
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
285
    return true;
286
  }
287

288
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
289
  return false;
290 291
}

H
Haojun Liao 已提交
292
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
293
                             uint32_t* status) {
S
slzhou 已提交
294
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
295
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
296 297

  pCost->totalBlocks += 1;
298
  pCost->totalRows += pBlock->info.rows;
299

H
Haojun Liao 已提交
300
  bool loadSMA = false;
H
Haojun Liao 已提交
301
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
302
  if (pOperator->exprSupp.pFilterInfo != NULL ||
303
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
304 305 306 307
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
308
  taosMemoryFreeClear(pBlock->pBlockAgg);
309 310

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
311 312
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
313
    pCost->filterOutBlocks += 1;
314
    pCost->totalRows += pBlock->info.rows;
315
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
316 317
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
318
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d, uid:%" PRIu64, GET_TASKID(pTaskInfo),
H
Haojun Liao 已提交
319
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
320
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
321
    pCost->skipBlocks += 1;
322
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
323
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
324
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
325
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
326
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
327
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
328
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
329 330
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
331
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
332
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
333 334
      return TSDB_CODE_SUCCESS;
    } else {
335
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
336
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
337
    }
H
Haojun Liao 已提交
338
  }
339

H
Haojun Liao 已提交
340
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
341

H
Haojun Liao 已提交
342
  // try to filter data block according to sma info
H
Haojun Liao 已提交
343
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
344 345 346
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
347
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
348 349 350 351 352 353
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

354
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
355 356
        return TSDB_CODE_SUCCESS;
      }
357
    }
H
Haojun Liao 已提交
358
  }
359

360 361 362
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

363
  // try to filter data block according to current results
364 365
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
366
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
367 368
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;
369
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
370
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
371 372 373
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
374 375
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
376

H
Haojun Liao 已提交
377 378
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
379
    return terrno;
H
Haojun Liao 已提交
380 381
  }

H
Haojun Liao 已提交
382
  ASSERT(p == pBlock);
383
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
384

H
Haojun Liao 已提交
385 386
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
387

H
Haojun Liao 已提交
388
  if (pOperator->exprSupp.pFilterInfo != NULL) {
389
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
390
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
391

392 393
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
394

395 396 397 398 399 400 401
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
402 403
  }

404
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
405
  if (limitReached) {  // set operator flag is done
406 407
    setOperatorCompleted(pOperator);
  }
408

H
Haojun Liao 已提交
409
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
410 411 412
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
413
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
414 415 416
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
417
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
418 419
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
420 421
}

422 423
typedef struct STableCachedVal {
  const char* pName;
424
  STag*       pTags;
425 426
} STableCachedVal;

427 428 429 430 431 432 433 434 435 436 437
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
438 439
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
440
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
441 442 443 444
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
445
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
446 447 448 449 450 451 452
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

453 454
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
455

456 457 458 459 460
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
461
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
462 463 464
  }
}

465 466
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
467
  // currently only the tbname pseudo column
468
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
469
    return TSDB_CODE_SUCCESS;
470 471
  }

472 473
  int32_t code = 0;

474 475 476 477
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

478
  bool            freeReader = false;
479
  STableCachedVal val = {0};
480 481

  SMetaReader mr = {0};
482
  LRUHandle*  h = NULL;
483

484 485 486
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

487
  // 1. check if it is existed in meta cache
488
  if (pCache == NULL) {
489
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
490
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
491
    if (code != TSDB_CODE_SUCCESS) {
492
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
493
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
494 495
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
496 497 498

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
499
      } else {
S
slzhou 已提交
500 501
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
502
      }
503 504 505 506 507
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
508

509 510
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
511 512

    freeReader = true;
513
  } else {
514 515
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
516
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
517 518
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
519
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
520
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
521
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
522
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
523
                pBlock->info.id.uid, tstrerror(terrno), idStr);
524 525
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
526
        } else {
H
Haojun Liao 已提交
527
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
528
                 idStr);
H
Haojun Liao 已提交
529
        }
530 531 532 533 534 535
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
536
      STableCachedVal* pVal = createTableCacheVal(&mr);
537

H
Haojun Liao 已提交
538
      val = *pVal;
539
      freeReader = true;
H
Haojun Liao 已提交
540

H
Haojun Liao 已提交
541
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
542
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
543 544 545 546 547 548 549 550
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
551

H
Haojun Liao 已提交
552
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
553
    }
H
Haojun Liao 已提交
554

555 556
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
557
  }
558

559 560
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
561
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
562 563

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
564
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
565

566
    int32_t functionId = pExpr1->pExpr->_function.functionId;
567 568 569

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
570
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
571
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
572
      STagVal tagVal = {0};
573 574
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
575

576 577 578 579
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
580
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
581
      }
582

H
Haojun Liao 已提交
583 584
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
585
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
586
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
587
        colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows);
H
Haojun Liao 已提交
588 589 590
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
L
Liu Jicong 已提交
591
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
592
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
593
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
594
        }
595 596 597 598
      }
    }
  }

599 600
  // restore the rows
  pBlock->info.rows = backupRows;
601 602 603 604
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
605
  return TSDB_CODE_SUCCESS;
606 607
}

H
Haojun Liao 已提交
608
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
609 610 611
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
612
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
613
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
614 615 616
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
617

H
Haojun Liao 已提交
618
  colInfoDataEnsureCapacity(&infoData, 1, false);
619
  colDataSetVal(&infoData, 0, buf, false);
620

H
Haojun Liao 已提交
621
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
622
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
623 624 625 626 627 628 629

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
630
  colDataDestroy(&infoData);
631 632
}

633
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
634
  STableScanInfo* pTableScanInfo = pOperator->info;
635
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
636
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
637

638 639
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
640
  while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
641
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
642
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
643
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
644
    }
H
Haojun Liao 已提交
645

646
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
647
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
648 649 650
      break;
    }

651 652 653 654 655 656
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
657
    ASSERT(pBlock->info.id.uid != 0);
H
Haojun Liao 已提交
658
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
659

660
    uint32_t status = 0;
H
Haojun Liao 已提交
661
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
662
    if (code != TSDB_CODE_SUCCESS) {
663
      T_LONG_JMP(pTaskInfo->env, code);
664
    }
665

666 667 668
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
669
    }
670

H
Haojun Liao 已提交
671 672
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
673

H
Haojun Liao 已提交
674
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
675 676

    // todo refactor
H
Haojun Liao 已提交
677
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
678 679
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
680
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
681
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
682

H
Haojun Liao 已提交
683
    ASSERT(pBlock->info.id.uid != 0);
684
    return pBlock;
H
Haojun Liao 已提交
685 686 687 688
  }
  return NULL;
}

H
Haojun Liao 已提交
689
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
690 691 692 693
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
694
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
695 696 697
    return NULL;
  }

698 699
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
700 701 702
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
703 704
    }

705
    pTableScanInfo->scanTimes += 1;
706

707
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
708
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
709 710
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
711
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
712

713
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
714
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
715
    }
716
  }
H
Haojun Liao 已提交
717

718
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
719
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
720 721 722
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
723
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
724
    }
H
Haojun Liao 已提交
725

726
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
727 728 729
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
730
      }
H
Haojun Liao 已提交
731

732
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
733

734
      if (pTableScanInfo->scanTimes < total) {
735
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
736
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
737

738
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
739
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
740
      }
H
Haojun Liao 已提交
741 742 743
    }
  }

wmmhello's avatar
wmmhello 已提交
744 745 746 747 748 749 750
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

751
  // scan table one by one sequentially
L
Liu Jicong 已提交
752
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
753
    int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
754

L
Liu Jicong 已提交
755
    while (1) {
H
Haojun Liao 已提交
756
      SSDataBlock* result = doGroupedTableScan(pOperator);
D
dapan1121 已提交
757
      if (result || (pOperator->status == OP_EXEC_DONE)) {
L
Liu Jicong 已提交
758 759
        return result;
      }
H
Haojun Liao 已提交
760

L
Liu Jicong 已提交
761 762
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
763
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
764 765
        return NULL;
      }
H
Haojun Liao 已提交
766

H
Haojun Liao 已提交
767
      STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
H
Haojun Liao 已提交
768
      tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
L
Liu Jicong 已提交
769 770
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);
H
Haojun Liao 已提交
771

H
Haojun Liao 已提交
772
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
773 774
      pInfo->scanTimes = 0;
    }
775 776
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
H
Haojun Liao 已提交
777
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
778
        setOperatorCompleted(pOperator);
779 780
        return NULL;
      }
781

5
54liuyao 已提交
782
      int32_t        num = 0;
783
      STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
784
      tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
785
      ASSERT(pInfo->base.dataReader == NULL);
786

L
Liu Jicong 已提交
787 788
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
789 790 791
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
792 793 794 795

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
796
    }
H
Haojun Liao 已提交
797

H
Haojun Liao 已提交
798
    SSDataBlock* result = doGroupedTableScan(pOperator);
799
    if (result != NULL) {
H
Haojun Liao 已提交
800
      ASSERT(result->info.id.uid != 0);
801 802
      return result;
    }
H
Haojun Liao 已提交
803

H
Haojun Liao 已提交
804
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
805
      setOperatorCompleted(pOperator);
806 807
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
808

809 810
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
811
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
812

5
54liuyao 已提交
813
    int32_t        num = 0;
814
    STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
815
    tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
816

H
Haojun Liao 已提交
817 818
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
819
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
820

H
Haojun Liao 已提交
821
    result = doGroupedTableScan(pOperator);
822 823 824
    if (result != NULL) {
      return result;
    }
825

H
Haojun Liao 已提交
826
    setOperatorCompleted(pOperator);
827 828
    return NULL;
  }
H
Haojun Liao 已提交
829 830
}

831 832
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
833
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
834
  *pRecorder = pTableScanInfo->base.readRecorder;
835 836 837 838 839
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

840
static void destroyTableScanOperatorInfo(void* param) {
841
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
842
  blockDataDestroy(pTableScanInfo->pResBlock);
H
Haojun Liao 已提交
843
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
H
Haojun Liao 已提交
844

H
Haojun Liao 已提交
845 846
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
847

H
Haojun Liao 已提交
848 849
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
850
  }
L
Liu Jicong 已提交
851

H
Haojun Liao 已提交
852 853
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
D
dapan1121 已提交
854
  taosMemoryFreeClear(param);
855 856
}

857
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
858
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
859 860 861
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
862
    goto _error;
H
Haojun Liao 已提交
863 864
  }

865
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
866
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
867 868

  int32_t numOfCols = 0;
869
  int32_t code =
H
Haojun Liao 已提交
870
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
871 872 873 874
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
875
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
876
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
877
  if (code != TSDB_CODE_SUCCESS) {
878
    goto _error;
879 880
  }

H
Haojun Liao 已提交
881
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
882
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
883
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
884
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
885 886
  }

887
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
888
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
889

H
Haojun Liao 已提交
890 891
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
892 893
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

894 895
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
896

H
Haojun Liao 已提交
897
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
898
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
899
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
900

H
Haojun Liao 已提交
901 902 903
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
904 905
  }

wmmhello's avatar
wmmhello 已提交
906
  pInfo->currentGroupId = -1;
907
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
908
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
909

L
Liu Jicong 已提交
910 911
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
912
  pOperator->exprSupp.numOfExprs = numOfCols;
913

H
Haojun Liao 已提交
914 915
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
916 917 918
    code = terrno;
    goto _error;
  }
919

H
Haojun Liao 已提交
920
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
921 922
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
923 924 925

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
926
  return pOperator;
927

928
_error:
929 930 931
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
932

933 934
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
935
  return NULL;
H
Haojun Liao 已提交
936 937
}

938
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
939
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
940
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
941

H
Haojun Liao 已提交
942
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
943
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
944

L
Liu Jicong 已提交
945 946
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
947
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
948 949 950
  return pOperator;
}

951
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
952
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
953 954
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
955 956
}

957
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
958
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
959 960
}

961
static bool isStateWindow(SStreamScanInfo* pInfo) {
962
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
963
}
5
54liuyao 已提交
964

L
Liu Jicong 已提交
965
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
966 967 968
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
969 970 971
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
972
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
973 974
}

975 976 977 978
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

979
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
980 981
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
982
  ASSERT(rowIndex < pBlock->info.rows);
983
  pInfo->groupId = groupCol[rowIndex];
984 985
}

L
Liu Jicong 已提交
986
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
987
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
988 989
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
990
  tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
991
  qDebug("1");
H
Haojun Liao 已提交
992
  pTableScanInfo->base.dataReader = NULL;
993 994
}

L
Liu Jicong 已提交
995 996
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
997
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
998

999
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1000
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1001 1002 1003 1004 1005 1006 1007 1008 1009

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1010 1011
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
1012 1013
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1014
    T_LONG_JMP(pTaskInfo->env, code);
1015 1016 1017
    return NULL;
  }

H
Haojun Liao 已提交
1018
  if (tsdbNextDataBlock(pReader)) {
L
Liu Jicong 已提交
1019
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1020
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
H
Haojun Liao 已提交
1021
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
1022 1023 1024 1025
  }

  tsdbReaderClose(pReader);
  qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1026 1027
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1028 1029

  return pBlock->info.rows > 0 ? pBlock : NULL;
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1041
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
H
Haojun Liao 已提交
1042
  return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
1043 1044
}

5
54liuyao 已提交
1045 1046 1047 1048 1049 1050 1051 1052
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1053
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1054 1055 1056
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1067 1068 1069
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1070 1071 1072 1073 1074 1075

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1076
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1077 1078 1079 1080
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1081 1082 1083
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1084
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1085 1086 1087
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1088
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1089 1090 1091
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1092 1093
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1094 1095 1096 1097
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1098
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1099 1100 1101
  return true;
}

5
54liuyao 已提交
1102
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1103
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1104
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1105
  dumyInfo.cur.pageId = -1;
1106
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1107 1108
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1109
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1110

5
54liuyao 已提交
1111
  while (1) {
1112 1113 1114
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1115
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1116 1117 1118 1119 1120
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1121
    }
5
54liuyao 已提交
1122

5
54liuyao 已提交
1123 1124 1125
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1126
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1127
    endWin = preWin;
5
54liuyao 已提交
1128
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1129 1130 1131 1132 1133 1134
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1135

L
Liu Jicong 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1147
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1148
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1149
      tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1150
      qDebug("2");
H
Haojun Liao 已提交
1151
      pTableScanInfo->base.dataReader = NULL;
1152 1153
      return NULL;
    }
L
Liu Jicong 已提交
1154

H
Haojun Liao 已提交
1155
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1156 1157 1158 1159
    if (pResult->info.rows == 0) {
      continue;
    }

1160 1161 1162 1163 1164 1165 1166 1167
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1168 1169
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1170
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1171 1172 1173 1174
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1175 1176 1177

      blockDataDestroy(tmpBlock);

1178 1179 1180 1181
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1182
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1183
      pResult->info.calWin = pInfo->updateWin;
1184
      return pResult;
5
54liuyao 已提交
1185 1186
    }
  }
1187
}
1188

1189
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1190
                                   SSessionKey* pKey) {
1191 1192 1193
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1194

1195 1196 1197 1198 1199
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1200 1201

  taosMemoryFree(pCur);
1202 1203 1204
  return code;
}

1205
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1206
  blockDataCleanup(pDestBlock);
1207 1208
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1209
  }
1210
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1211
  if (code != TSDB_CODE_SUCCESS) {
1212
    return code;
L
Liu Jicong 已提交
1213
  }
1214 1215
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1216
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1217
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1218
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1219 1220
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1221

1222 1223
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1224
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1225
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1226 1227
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1228
  int64_t          version = pSrcBlock->info.version - 1;
1229
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1230
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1231
    // gap must be 0.
5
54liuyao 已提交
1232
    SSessionKey startWin = {0};
1233
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1234
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1235 1236 1237
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1238 1239
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1240
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1241 1242 1243 1244
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1245
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1246 1247
      continue;
    }
1248 1249
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1250

1251
    colDataSetNULL(pDestUidCol, i);
1252
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1253 1254
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1255
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1256
  }
1257
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1258
}
1259 1260 1261 1262 1263 1264

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1265
  }
1266

1267 1268
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1269 1270
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1271

L
Liu Jicong 已提交
1272
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1273
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1310 1311
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1312
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1313 1314 1315
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1316
  for (int32_t i = 0; i < rows;) {
1317
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1318 1319 1320 1321 1322
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1323
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1324
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1325 1326
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1327 1328 1329 1330 1331
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1332
    pDestBlock->info.rows++;
5
54liuyao 已提交
1333
  }
1334 1335
  return TSDB_CODE_SUCCESS;
}
1336

1337
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1338 1339 1340
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1341 1342
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1343
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1344 1345 1346 1347
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1358
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1359 1360
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1361
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1362 1363 1364
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1365
    if (pInfo->tbnameCalSup.pExprInfo) {
1366 1367 1368
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1369 1370
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1371
      tdbFree(parTbname);
L
Liu Jicong 已提交
1372 1373 1374
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1375 1376 1377 1378
  }
  return TSDB_CODE_SUCCESS;
}

1379 1380 1381 1382
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1383
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1384
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1385 1386
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1387
  }
1388
  pDestBlock->info.type = STREAM_CLEAR;
1389
  pDestBlock->info.version = pSrcBlock->info.version;
1390
  pDestBlock->info.dataLoad = 1;
1391 1392 1393 1394
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1395 1396 1397
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SExprSupp*    pTagCalSup = &pInfo->tagCalSup;
1398
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1399
  if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
L
Liu Jicong 已提交
1400
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1401

L
Liu Jicong 已提交
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
  void*   tag = NULL;
  int32_t tagLen = 0;
  if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
    pBlock->info.tagLen = tagLen;
    void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
    if (pTag == NULL) {
      tdbFree(tag);
      taosMemoryFree(pBlock->info.pTag);
      pBlock->info.pTag = NULL;
      pBlock->info.tagLen = 0;
      return;
    }
    pBlock->info.pTag = pTag;
    memcpy(pBlock->info.pTag, tag, tagLen);
    tdbFree(tag);
    return;
L
Liu Jicong 已提交
1418
  } else {
L
Liu Jicong 已提交
1419
    pBlock->info.pTag = NULL;
L
Liu Jicong 已提交
1420
  }
L
Liu Jicong 已提交
1421 1422 1423
  tdbFree(tag);
}
#endif
L
Liu Jicong 已提交
1424

5
54liuyao 已提交
1425
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1426 1427
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
5
54liuyao 已提交
1428 1429
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1430
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1431
  } else {
5
54liuyao 已提交
1432 1433
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1434
  }
L
Liu Jicong 已提交
1435 1436
}

1437 1438
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1439 1440
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1441 1442
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1443 1444
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1445
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1446 1447 1448 1449 1450 1451 1452
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1453
  pBlock->info.rows++;
5
54liuyao 已提交
1454 1455
}

1456
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1457 1458
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1459
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1460
  }
1461 1462
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1463
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1464
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1465
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1466 1467
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1468
    bool        isClosed = false;
5
54liuyao 已提交
1469
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1470
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1471 1472 1473 1474 1475
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1476 1477 1478
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1479
    // must check update info first.
H
Haojun Liao 已提交
1480
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1481
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1482
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1483
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1484
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1485
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1486
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1487
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1488
                                       NULL);
5
54liuyao 已提交
1489 1490
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1491 1492
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1493
      }
1494 1495
    }
  }
1496 1497
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1498
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1499
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1500
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1501 1502
  }
}
L
Liu Jicong 已提交
1503

1504
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1505 1506
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1507
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1508

1509 1510
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1511
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1512
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1513
  pInfo->pRes->info.type = STREAM_NORMAL;
1514
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1515

H
Haojun Liao 已提交
1516
  pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
L
Liu Jicong 已提交
1517 1518

  // todo extract method
H
Haojun Liao 已提交
1519 1520 1521
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1522 1523 1524 1525 1526 1527 1528
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1529
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1530
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1531 1532 1533 1534 1535 1536 1537
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1538
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1539
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1540 1541 1542 1543 1544
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1545
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1546
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1547 1548
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1549
      blockDataFreeRes((SSDataBlock*)pBlock);
1550
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1551
    }
K
kailixu 已提交
1552 1553 1554

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1555 1556
  }

1557
  if (filter) {
H
Haojun Liao 已提交
1558
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1559
  }
1560

1561
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1562
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1563
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1564

L
Liu Jicong 已提交
1565
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1566 1567
  return 0;
}
5
54liuyao 已提交
1568

L
Liu Jicong 已提交
1569
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1570 1571
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1572

1573
  qDebug("start to exec queue scan");
L
Liu Jicong 已提交
1574

L
Liu Jicong 已提交
1575
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
L
Liu Jicong 已提交
1576 1577
    if (pInfo->tqReader->msg2.msgStr == NULL) {
      /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
L
Liu Jicong 已提交
1578

L
Liu Jicong 已提交
1579
      /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
L
Liu Jicong 已提交
1580 1581
      /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
      /*void* msgStr = pTaskInfo->streamInfo.*/
L
Liu Jicong 已提交
1582
      SPackedData submit = pTaskInfo->streamInfo.submit;
L
Liu Jicong 已提交
1583
      if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
L
Liu Jicong 已提交
1584
        qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
L
Liu Jicong 已提交
1585
        pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1586
        pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1587 1588 1589 1590 1591 1592 1593
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

L
Liu Jicong 已提交
1594
    while (tqNextDataBlock2(pInfo->tqReader)) {
L
Liu Jicong 已提交
1595 1596
      SSDataBlock block = {0};

1597
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1598 1599 1600 1601
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1602
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1603 1604 1605 1606 1607 1608

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1609
    pInfo->tqReader->msg2 = (SPackedData){0};
L
Liu Jicong 已提交
1610
    pInfo->tqReader->setMsg = 0;
L
Liu Jicong 已提交
1611
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1612
    return NULL;
L
Liu Jicong 已提交
1613 1614
  }

L
Liu Jicong 已提交
1615 1616 1617
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
1618 1619
      qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
             pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
1620
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1621 1622
      return pResult;
    } else {
1623 1624
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1625
        tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1626
        qDebug("3");
H
Haojun Liao 已提交
1627
        pTSInfo->base.dataReader = NULL;
1628
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1629
        qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
H
Haojun Liao 已提交
1630
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1631
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1632 1633 1634
          return NULL;
        }
      } else {
L
Liu Jicong 已提交
1635 1636
        return NULL;
      }
1637 1638 1639
    }
  }

L
Liu Jicong 已提交
1640 1641 1642
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
1643
      if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
1644 1645 1646 1647
        // if the end is reached, terrno is 0
        if (terrno != 0) {
          qError("failed to get next log block since %s", terrstr());
        }
1648
      }
1649

L
Liu Jicong 已提交
1650 1651
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1652
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1653
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1654
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1655
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1656 1657 1658
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
1659
        qError("unexpected ret.fetchType:%d", ret.fetchType);
1660
        continue;
L
Liu Jicong 已提交
1661 1662 1663
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1664 1665
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1666
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1667 1668
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1669
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1670
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1671 1672 1673
        return NULL;
      }
    }
L
Liu Jicong 已提交
1674
  } else {
1675
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type);
L
Liu Jicong 已提交
1676
    return NULL;
H
Haojun Liao 已提交
1677
  }
L
Liu Jicong 已提交
1678 1679
}

L
Liu Jicong 已提交
1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1698 1699 1700
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1701

1702 1703 1704
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1705 1706 1707
      j++;
    }
  }
L
Liu Jicong 已提交
1708
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1709 1710
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1711
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1712 1713 1714 1715

  return 0;
}

5
54liuyao 已提交
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1728
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1729 1730 1731 1732
    }
  }
}

5
54liuyao 已提交
1733
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1734
  if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1735
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1747 1748 1749 1750
    }
  }
}

L
Liu Jicong 已提交
1751 1752 1753 1754 1755
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1756
  qDebug("stream scan called");
H
Haojun Liao 已提交
1757

1758 1759
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1760
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1761
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1762
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1763 1764 1765 1766
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1767
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1768
    } else {
H
Haojun Liao 已提交
1769 1770 1771 1772
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1773
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1774
    }
L
Liu Jicong 已提交
1775 1776

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1777
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1778 1779
    qDebug("4");

H
Haojun Liao 已提交
1780
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1781
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1782

L
Liu Jicong 已提交
1783 1784
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1785
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1786 1787
  }

5
54liuyao 已提交
1788 1789
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1790 1791 1792 1793 1794
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1795 1796 1797 1798 1799 1800 1801

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
          // printDataBlock(pSDB, "stream scan update");
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1823 1824 1825 1826 1827 1828
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1829
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1830
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1831
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1832 1833 1834 1835 1836 1837
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1838
      }
5
54liuyao 已提交
1839 1840 1841 1842 1843 1844 1845
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        return pInfo->pCreateTbRes;
      }
      qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1846 1847
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1848
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1849
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1850 1851
    qDebug("5");

H
Haojun Liao 已提交
1852
    pTSInfo->base.dataReader = NULL;
1853

H
Haojun Liao 已提交
1854 1855
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1856

L
Liu Jicong 已提交
1857
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1858 1859 1860
    return NULL;
  }

5
54liuyao 已提交
1861
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1862
// TODO: refactor
L
Liu Jicong 已提交
1863
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1864
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1865
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1866
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1867
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1868 1869 1870
      return NULL;
    }

1871
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1872 1873
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1874
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1875
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1876
    }
1877
    // TODO move into scan
5
54liuyao 已提交
1878 1879
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1880
    pBlock->info.dataLoad = 1;
1881
    blockDataUpdateTsWindow(pBlock, 0);
1882
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1883 1884 1885
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1886 1887 1888
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1889 1890
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1891 1892 1893
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1894
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1895
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1896
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1897
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1898
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1899 1900
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1901
        }
5
54liuyao 已提交
1902 1903
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1904 1905 1906 1907 1908 1909
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1910
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1911
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1912
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1913
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1914 1915
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1916 1917 1918 1919 1920
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1921 1922 1923
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1924
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1925 1926 1927
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1928 1929 1930 1931
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1932
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1933
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1934 1935 1936 1937
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1938
        }
1939 1940 1941
      } break;
      default:
        break;
5
54liuyao 已提交
1942
    }
1943
    // printDataBlock(pBlock, "stream scan recv");
1944
    return pBlock;
L
Liu Jicong 已提交
1945
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1946
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1947 1948 1949
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1950
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1951 1952 1953
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1954 1955 1956
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
1957
      } break;
1958
      case STREAM_SCAN_FROM_DELETE_DATA: {
1959 1960 1961 1962 1963 1964 1965
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1966 1967 1968 1969 1970 1971 1972 1973 1974 1975
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1976
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1977 1978
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1979 1980
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1981
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1982
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
1983 1984
          return pSDB;
        }
1985
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1986 1987 1988 1989
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1990
    }
1991

1992
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1993
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1994 1995
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1996 1997
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1998 1999
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2000
    }
5
54liuyao 已提交
2001

H
Haojun Liao 已提交
2002 2003
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2004
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2005

L
Liu Jicong 已提交
2006
  NEXT_SUBMIT_BLK:
2007
    while (1) {
L
Liu Jicong 已提交
2008
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2009
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2010
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2011
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2012
          qDebug("stream scan return empty, consume block %d", totBlockNum);
2013 2014
          return NULL;
        }
2015

L
Liu Jicong 已提交
2016 2017
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
L
Liu Jicong 已提交
2018
        /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
L
Liu Jicong 已提交
2019
        if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2020 2021 2022 2023
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2024 2025
      }

2026 2027
      blockDataCleanup(pInfo->pRes);

L
Liu Jicong 已提交
2028
      while (tqNextDataBlock2(pInfo->tqReader)) {
2029
        SSDataBlock block = {0};
2030

2031
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2032 2033 2034 2035 2036

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2037
        setBlockIntoRes(pInfo, &block, false);
2038

H
Haojun Liao 已提交
2039
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
2040
                             pInfo->pRes->info.version)) {
2041 2042 2043 2044 2045
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

5
54liuyao 已提交
2046 2047 2048
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2049 2050
        }

5
54liuyao 已提交
2051
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2052
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2053
        pInfo->pRes->info.dataLoad = 1;
2054 2055 2056
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2057 2058 2059
          break;
        }
      }
2060
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2061
        break;
J
jiacy-jcy 已提交
2062
      } else {
2063
        continue;
5
54liuyao 已提交
2064
      }
H
Haojun Liao 已提交
2065 2066 2067 2068
    }

    // record the scan action.
    pInfo->numOfExec++;
2069
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2070
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2071

L
Liu Jicong 已提交
2072
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
2073 2074 2075
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2076 2077 2078 2079 2080 2081

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2082 2083 2084
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2085 2086 2087
  }
}

H
Haojun Liao 已提交
2088
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2089 2090 2091
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2092 2093 2094
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2095 2096 2097 2098 2099 2100
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2101
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2102 2103
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2104
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2105
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2106
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2107

wmmhello's avatar
wmmhello 已提交
2108
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2109
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
2110
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
2111
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2112
        tsdbReleaseDataBlock(pInfo->dataReader);
2113
        longjmp(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2114
      }
2115

H
Haojun Liao 已提交
2116 2117
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
wmmhello's avatar
wmmhello 已提交
2118
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2119 2120
      }

H
Haojun Liao 已提交
2121
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2122
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2123
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2124 2125 2126
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2127 2128

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2129
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2130 2131
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2132 2133
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2134
    } else {
wmmhello's avatar
wmmhello 已提交
2135 2136
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2137
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2138 2139
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2140
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2141
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2142
    return NULL;
L
Liu Jicong 已提交
2143 2144 2145 2146 2147 2148 2149
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2150
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2151
      taosMemoryFreeClear(data);
2152 2153 2154
      return NULL;
    }

L
Liu Jicong 已提交
2155
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2156 2157 2158 2159
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2160
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2161
    } else {
wmmhello's avatar
wmmhello 已提交
2162 2163 2164 2165 2166 2167 2168
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2169

wmmhello's avatar
wmmhello 已提交
2170
    return NULL;
2171
  }
L
Liu Jicong 已提交
2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2210 2211 2212
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2213
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2214 2215 2216 2217 2218 2219
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2220 2221 2222
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2223
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2224 2225 2226 2227 2228
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2229 2230
  int32_t code = TSDB_CODE_SUCCESS;

2231
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2232
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2233
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2234 2235
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2236 2237
  }

wmmhello's avatar
wmmhello 已提交
2238 2239
  pInfo->vnode = pHandle->vnode;

2240
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2241 2242
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2243

2244
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2245
  return pOperator;
H
Haojun Liao 已提交
2246

L
Liu Jicong 已提交
2247
_end:
H
Haojun Liao 已提交
2248 2249 2250 2251
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2252 2253
}

2254
static void destroyStreamScanOperatorInfo(void* param) {
2255 2256
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2257
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2258 2259 2260 2261
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2262 2263
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2264
  }
C
Cary Xu 已提交
2265 2266
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2267
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2268
  }
C
Cary Xu 已提交
2269

L
Liu Jicong 已提交
2270
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2271
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2272

L
Liu Jicong 已提交
2273
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2274 2275 2276 2277
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2278
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2279
  blockDataDestroy(pStreamScan->pCreateTbRes);
2280 2281 2282 2283
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2284
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2285
                                            SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2286
  SArray*          pColIds = NULL;
2287 2288
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2289

H
Haojun Liao 已提交
2290
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2291
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2292
    goto _error;
H
Haojun Liao 已提交
2293 2294
  }

2295
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2296
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2297

2298
  pInfo->pTagCond = pTagCond;
2299
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2300

2301
  int32_t numOfCols = 0;
2302 2303
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2304 2305 2306
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2307

H
Haojun Liao 已提交
2308
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2309
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2310
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2311
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2312 2313

    int16_t colId = id->colId;
2314
    taosArrayPush(pColIds, &colId);
2315
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2316
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2317
    }
H
Haojun Liao 已提交
2318 2319
  }

L
Liu Jicong 已提交
2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2333 2334
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2335
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2336 2337 2338 2339 2340 2341 2342 2343 2344 2345
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

L
Liu Jicong 已提交
2346
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2347
  if (pInfo->pBlockLists == NULL) {
2348 2349
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2350 2351
  }

5
54liuyao 已提交
2352
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2353
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2354
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2355
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2356
      pTSInfo->base.cond.endVersion = pHandle->version;
2357
    }
L
Liu Jicong 已提交
2358

2359
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2360
    int32_t        num = 0;
H
Haojun Liao 已提交
2361
    tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
2362

2363
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2364
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2365
      pTSInfo->base.dataReader = NULL;
2366
      pTaskInfo->streamInfo.lastStatus.uid = -1;
L
Liu Jicong 已提交
2367 2368
    }

L
Liu Jicong 已提交
2369 2370 2371 2372
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2373
    } else {
L
Liu Jicong 已提交
2374 2375
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2376 2377
    }

2378
    pInfo->pUpdateInfo = NULL;
2379
    pInfo->pTableScanOp = pTableScanOp;
2380 2381 2382
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2383

L
Liu Jicong 已提交
2384 2385
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2386
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2387 2388
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2389

L
Liu Jicong 已提交
2390
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2391
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
H
Haojun Liao 已提交
2392
    SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
2393
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2394 2395 2396 2397 2398
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2399
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2400 2401
  } else {
    taosArrayDestroy(pColIds);
H
Haojun Liao 已提交
2402
    pColIds = NULL;
5
54liuyao 已提交
2403 2404
  }

2405 2406 2407 2408 2409
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2410 2411 2412 2413 2414
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2415
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2416
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2417
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2418
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2419
  pInfo->groupId = 0;
2420
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2421
  pInfo->pStreamScanOp = pOperator;
2422
  pInfo->deleteDataIndex = 0;
2423
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2424
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2425
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2426
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2427
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2428 2429
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2430
  pInfo->twAggSup.maxTs = INT64_MIN;
L
Liu Jicong 已提交
2431

L
Liu Jicong 已提交
2432 2433
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2434
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2435

L
Liu Jicong 已提交
2436
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2437 2438
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2439

H
Haojun Liao 已提交
2440
  return pOperator;
2441

L
Liu Jicong 已提交
2442
_error:
H
Haojun Liao 已提交
2443 2444 2445 2446 2447 2448 2449 2450
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2451 2452
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2453 2454
}

2455
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2456 2457 2458 2459
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2460 2461 2462
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2463
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2464
  SSDataBlock*  pRes = pInfo->pRes;
2465
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2466

H
Haojun Liao 已提交
2467
  int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
wmmhello's avatar
wmmhello 已提交
2468
  if (size == 0) {
H
Haojun Liao 已提交
2469 2470 2471 2472
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2473 2474 2475
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2476
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2477

wmmhello's avatar
wmmhello 已提交
2478
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
H
Haojun Liao 已提交
2479
    STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
L
Liu Jicong 已提交
2480
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2481
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2482
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2483 2484
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2485
      metaReaderClear(&mr);
2486
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2487
    }
H
Haojun Liao 已提交
2488

2489
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2490 2491 2492 2493 2494
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2495
        colDataSetVal(pDst, count, str, false);
2496
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2497 2498
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2499
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2500

2501 2502 2503 2504
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2505 2506
          data = (char*)p;
        }
2507
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2508
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2509

2510 2511
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2512
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2513
        }
H
Haojun Liao 已提交
2514 2515 2516
      }
    }

2517
    count += 1;
wmmhello's avatar
wmmhello 已提交
2518
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2519
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2520 2521 2522
    }
  }

2523 2524
  metaReaderClear(&mr);

2525
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2526
  if (pOperator->status == OP_EXEC_DONE) {
2527
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2528 2529 2530
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2531
  pOperator->resultInfo.totalRows += count;
2532

2533
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2534 2535
}

2536
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2537 2538
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2539
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2540
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2541 2542
}

S
slzhou 已提交
2543 2544
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         SExecTaskInfo* pTaskInfo) {
2545
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2546 2547 2548 2549 2550
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2551 2552 2553 2554
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2555
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2556 2557 2558
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2559

H
Haojun Liao 已提交
2560 2561
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2562 2563 2564
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2565

H
Haojun Liao 已提交
2566
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2567 2568
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2569

L
Liu Jicong 已提交
2570 2571
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2572
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2573 2574
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2575 2576
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2577 2578

  return pOperator;
2579

2580
_error:
H
Haojun Liao 已提交
2581 2582 2583 2584 2585
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2586

dengyihao's avatar
dengyihao 已提交
2587
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2588 2589 2590 2591 2592 2593
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2594
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2595

H
Haojun Liao 已提交
2596
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2597

L
Liu Jicong 已提交
2598 2599
  int64_t      st = taosGetTimestampUs();
  void*        p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2600
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2601

D
dapan1121 已提交
2602 2603 2604 2605 2606
  if (NULL == source->dataReader || !source->multiReader) {
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2607
  }
D
dapan1121 已提交
2608 2609
  
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2610
  STsdbReader* reader = pInfo->base.dataReader;
2611
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
dengyihao's avatar
opt mem  
dengyihao 已提交
2612
  while (tsdbNextDataBlock(reader)) {
H
Haojun Liao 已提交
2613
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2614
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2615
      pInfo->base.dataReader = NULL;
2616
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2617 2618 2619
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2620
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2621 2622 2623 2624
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2625
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2626 2627 2628 2629
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2630 2631

    uint32_t status = 0;
2632
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2633
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2634
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2635
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2636 2637 2638 2639 2640 2641 2642
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

H
Haojun Liao 已提交
2643
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2644

H
Haojun Liao 已提交
2645
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2646
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2647

2648
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2649 2650 2651 2652
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2653
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2654 2655
    return pBlock;
  }
H
Haojun Liao 已提交
2656

D
dapan1121 已提交
2657 2658 2659 2660
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2661
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2662 2663 2664
  return NULL;
}

2665 2666 2667
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2668
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2669
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2670
      tsTargetSlotId = colInfo->dstSlotId;
2671 2672 2673
    }
  }

2674 2675 2676
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2677
  bi.slotId = tsTargetSlotId;
2678 2679 2680 2681 2682 2683 2684
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2685
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2686 2687 2688 2689 2690 2691 2692
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2693

2694
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2695 2696 2697
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2698
  {
H
Haojun Liao 已提交
2699
    size_t  numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2700
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2701
    for (; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2702
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
S
slzhou 已提交
2703 2704 2705 2706 2707 2708
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2709

S
slzhou 已提交
2710 2711
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2712

H
Haojun Liao 已提交
2713
  pInfo->base.dataReader = NULL;
2714

2715 2716
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2717
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2718
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2719 2720
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2721

dengyihao's avatar
dengyihao 已提交
2722
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2723 2724 2725 2726 2727 2728

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2729 2730 2731
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2732
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2733
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2734 2735
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2736
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2737 2738

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2739
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2740
    taosArrayPush(pInfo->queryConds, &cond);
2741 2742
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2743
  for (int32_t i = 0; i < numOfTable; ++i) {
2744
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2745
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2746
    ps->param = param;
2747
    ps->onlyRef = true;
2748 2749 2750 2751 2752 2753
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2754
    T_LONG_JMP(pTaskInfo->env, terrno);
2755 2756
  }

2757 2758 2759 2760 2761 2762 2763
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2764
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2765

2766 2767 2768 2769 2770 2771 2772
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2773
  for (int32_t i = 0; i < numOfTable; ++i) {
2774 2775
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
D
dapan1121 已提交
2776 2777
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2778
  }
2779 2780
  taosArrayClear(pInfo->sortSourceParams);

2781
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2782
  pInfo->pSortHandle = NULL;
2783

dengyihao's avatar
opt mem  
dengyihao 已提交
2784 2785 2786
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2787
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2788 2789 2790
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2791
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2792 2793 2794
  return TSDB_CODE_SUCCESS;
}

2795 2796
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2797 2798
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2799 2800 2801
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2802
  blockDataCleanup(pResBlock);
2803 2804

  while (1) {
2805
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2806 2807 2808 2809
    if (pTupleHandle == NULL) {
      break;
    }

2810 2811
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2812 2813 2814 2815
      break;
    }
  }

2816
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
2817
  qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2818
         pInfo->limitInfo.numOfOutputRows);
2819

2820
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2833
    T_LONG_JMP(pTaskInfo->env, code);
2834
  }
2835

H
Haojun Liao 已提交
2836
  size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2837 2838
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2839

S
slzhou 已提交
2840
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2841
      setOperatorCompleted(pOperator);
2842 2843
      return NULL;
    }
S
slzhou 已提交
2844
    pInfo->tableStartIndex = 0;
H
Haojun Liao 已提交
2845
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
2846 2847
    startGroupTableMergeScan(pOperator);
  }
2848

S
slzhou 已提交
2849 2850
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2851 2852 2853 2854
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2855 2856
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2857
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2858
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2859 2860 2861
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2862
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2863 2864
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2865
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2866 2867
        break;
      }
2868

S
slzhou 已提交
2869
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
H
Haojun Liao 已提交
2870
      pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2871
      startGroupTableMergeScan(pOperator);
D
dapan1121 已提交
2872
      resetLimitInfoForNextGroup(&pInfo->limitInfo);      
S
slzhou 已提交
2873
    }
wmmhello's avatar
wmmhello 已提交
2874 2875
  }

2876 2877 2878
  return pBlock;
}

2879
void destroyTableMergeScanOperatorInfo(void* param) {
2880
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2881
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2882

dengyihao's avatar
dengyihao 已提交
2883 2884 2885
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2886 2887
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
D
dapan1121 已提交
2888 2889
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2890
  }
H
Haojun Liao 已提交
2891

D
dapan1121 已提交
2892 2893 2894
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;

2895
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2896 2897
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2898

dengyihao's avatar
opt mem  
dengyihao 已提交
2899 2900 2901
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2902
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2903
  taosArrayDestroy(pTableScanInfo->queryConds);
2904

H
Haojun Liao 已提交
2905 2906
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
2907 2908 2909 2910 2911 2912
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
H
Haojun Liao 已提交
2913
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
L
Liu Jicong 已提交
2914

H
Haojun Liao 已提交
2915 2916
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);

D
dapan1121 已提交
2917
  taosMemoryFreeClear(param);
2918 2919 2920 2921
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2922 2923
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2924
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2925
  execInfo->blockRecorder = pInfo->base.readRecorder;
2926
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2927 2928 2929

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2930

2931 2932 2933
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2934 2935
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                                SExecTaskInfo* pTaskInfo) {
2936 2937 2938 2939 2940
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2941

2942 2943 2944
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
2945
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
2946
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
2947 2948 2949
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2950

H
Haojun Liao 已提交
2951
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
2952
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2953
    taosArrayDestroy(pInfo->base.matchInfo.pList);
2954 2955 2956 2957
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
2958
    SExprSupp* pSup = &pInfo->base.pseudoSup;
2959 2960
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
2961 2962 2963 2964
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
2965 2966 2967 2968 2969 2970
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
2971 2972
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
2973
  pInfo->base.readHandle = *readHandle;
2974 2975 2976

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
H
Haojun Liao 已提交
2977

2978
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2979
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
2980 2981 2982 2983 2984 2985

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2986
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
2987
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
2988 2989
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

2990
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2991

H
Haojun Liao 已提交
2992
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
2993
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2994
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
2995

dengyihao's avatar
dengyihao 已提交
2996
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
2997 2998
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
2999

L
Liu Jicong 已提交
3000 3001
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3002
  pOperator->exprSupp.numOfExprs = numOfCols;
3003

3004 3005
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3006 3007 3008 3009 3010 3011 3012 3013 3014
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3015 3016 3017 3018

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3019
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3020 3021 3022 3023 3024 3025
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3026 3027
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3028 3029 3030 3031 3032 3033 3034
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3096 3097
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3126

S
slzhou 已提交
3127
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3128 3129 3130
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3131
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3132
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3133
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3134 3135 3136 3137 3138 3139 3140 3141 3142

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3143
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3144 3145
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3146 3147 3148
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3149 3150 3151

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3152 3153
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3165 3166 3167
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3168
    ASSERT(strlen(dbName));
S
slzhou 已提交
3169
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3170 3171 3172 3173

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3174
    varDataSetLen(varDbName, strlen(dbName));
3175
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3176 3177 3178 3179
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3180
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3181
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3182
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3183
      varDataSetLen(varStbName, strlen(stbName));
3184
      colDataSetVal(colInfoData, 0, varStbName, false);
3185
    } else {
3186
      colDataSetNULL(colInfoData, 0);
3187
    }
S
slzhou 已提交
3188 3189 3190
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3191
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3192
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3193 3194 3195 3196
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3197
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3198 3199 3200
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3201
  size_t infodbTableNum;
S
slzhou 已提交
3202
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3203
  size_t perfdbTableNum;
S
slzhou 已提交
3204 3205
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3206
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3207
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3208 3209
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3210
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3211 3212 3213 3214
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3231 3232 3233 3234 3235 3236 3237
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
    
S
slzhou 已提交
3238 3239 3240
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3241 3242 3243 3244 3245 3246 3247
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3248 3249 3250 3251 3252 3253 3254 3255
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3256
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3257 3258 3259 3260
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3261
  blockDataCleanup(pRes);
3262

S
slzhou 已提交
3263 3264 3265
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3266
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3267
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3268
  }
S
slzhou 已提交
3269

S
slzhou 已提交
3270 3271 3272 3273 3274
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3275 3276
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3277
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3278

S
slzhou 已提交
3279 3280 3281 3282 3283 3284
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3285
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3300
      }
S
slzhou 已提交
3301 3302 3303 3304 3305 3306 3307 3308 3309 3310
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3311
    } else {
S
slzhou 已提交
3312
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3313 3314
    }
  } else {
S
slzhou 已提交
3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3332 3333 3334
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3335
    }
S
slzhou 已提交
3336 3337 3338
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3339
  }
S
slzhou 已提交
3340 3341 3342 3343 3344 3345
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3346 3347 3348 3349
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
  
S
slzhou 已提交
3350 3351 3352
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3353 3354 3355
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3356 3357 3358 3359 3360 3361 3362 3363
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3364 3365 3366 3367 3368 3369
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
  
S
slzhou 已提交
3370 3371 3372 3373 3374 3375 3376 3377
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3378 3379 3380
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3381
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3382 3383
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3384
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3385 3386
  taosMemoryFreeClear(param);
}