scanoperator.c 122.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33 34

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
35
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
36

H
Haojun Liao 已提交
37 38 39 40 41 42 43 44 45 46 47 48
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
} STableMergeScanSortSourceParam;

L
Liu Jicong 已提交
49
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
50

H
Haojun Liao 已提交
51
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
52 53 54 55 56 57 58 59 60 61 62 63
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

64
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
65 66 67 68 69
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

70 71 72 73 74 75 76 77 78
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
79
  // convert key to second
80 81 82 83 84 85 86
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
87
  time_t    t = (time_t)key;
88 89 90 91 92
  taosLocalTime(&t, &tm);

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
93
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
94 95 96 97

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
98
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
99 100 101 102

  tw->ekey -= 1;
}

103
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
104 105 106 107 108 109 110
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

111
  if (order == TSDB_ORDER_ASC) {
112
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
113
    ASSERT(w.ekey >= pBlockInfo->window.skey);
114

115
    if (w.ekey < pBlockInfo->window.ekey) {
116 117 118
      return true;
    }

119 120
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
121 122 123 124
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

125
      ASSERT(w.ekey > pBlockInfo->window.ekey);
126
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
127 128 129 130
        return true;
      }
    }
  } else {
131
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
132
    ASSERT(w.skey <= pBlockInfo->window.ekey);
133

134
    if (w.skey > pBlockInfo->window.skey) {
135 136 137
      return true;
    }

138
    while (1) {
139 140 141 142 143 144
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
145
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
146 147 148
        return true;
      }
    }
149 150 151 152 153
  }

  return false;
}

154 155 156 157 158 159 160 161 162 163 164
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
165 166
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
167 168 169 170 171

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
172
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
173 174 175 176 177 178
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
179
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
180 181 182
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
183
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
184 185

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
186
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
187 188 189 190 191 192 193 194 195

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
196
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
197 198 199 200 201 202 203 204 205

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
206
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
207 208 209 210 211 212 213 214

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
215
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
216
                               int32_t numOfRows) {
H
Haojun Liao 已提交
217
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
218 219 220
    return true;
  }

H
Haojun Liao 已提交
221
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
222 223 224
  return keep;
}

H
Haojun Liao 已提交
225
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
226
  bool    allColumnsHaveAgg = true;
227
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
228
  if (code != TSDB_CODE_SUCCESS) {
229
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
230 231 232 233 234 235 236 237
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
238
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
239
                               int32_t rows) {
H
Haojun Liao 已提交
240 241 242
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

243
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
244
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
245
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
246
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
247 248
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
249 250 251

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
252 253 254
  }
}

255 256
// todo handle the slimit info
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
257
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
258
  const char* id = GET_TASKID(pTaskInfo);
259 260 261 262

  if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
263
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
264
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
265 266 267 268 269 270 271 272 273 274 275 276
    } else {
      blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
    int32_t overflowRows = pLimitInfo->numOfOutputRows + pBlock->info.rows - pLimit->limit;
    int32_t keep = pBlock->info.rows - overflowRows;

    blockDataKeepFirstNRows(pBlock, keep);
H
Haojun Liao 已提交
277
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
278 279 280 281
    pOperator->status = OP_EXEC_DONE;
  }
}

H
Haojun Liao 已提交
282
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
283
                             uint32_t* status) {
S
slzhou 已提交
284
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
285
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
286 287

  pCost->totalBlocks += 1;
288
  pCost->totalRows += pBlock->info.rows;
289

H
Haojun Liao 已提交
290
  bool loadSMA = false;
H
Haojun Liao 已提交
291
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
292
  if (pOperator->exprSupp.pFilterInfo != NULL ||
293
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
294 295 296 297
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
298
  taosMemoryFreeClear(pBlock->pBlockAgg);
299 300

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
301 302
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
303
    pCost->filterOutBlocks += 1;
304
    pCost->totalRows += pBlock->info.rows;
305 306
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
307 308
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
309
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
310 311
    pCost->skipBlocks += 1;
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
312
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
313
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
314
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
315
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
316
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
317 318
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
319
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
320 321
      return TSDB_CODE_SUCCESS;
    } else {
322
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
323
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
324
    }
H
Haojun Liao 已提交
325
  }
326

H
Haojun Liao 已提交
327
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
328

H
Haojun Liao 已提交
329
  // try to filter data block according to sma info
H
Haojun Liao 已提交
330
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
331 332 333
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
334
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
335 336 337 338 339 340 341 342
      if (!keep) {
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
               pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

        return TSDB_CODE_SUCCESS;
      }
343
    }
H
Haojun Liao 已提交
344
  }
345

346 347 348
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

349
  // try to filter data block according to current results
350 351
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
352
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
353 354 355
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
    pCost->skipBlocks += 1;

356
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
357 358 359
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
360 361
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
362

H
Haojun Liao 已提交
363 364
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
365
    return terrno;
H
Haojun Liao 已提交
366 367
  }

H
Haojun Liao 已提交
368
  ASSERT(p == pBlock);
369
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
370

H
Haojun Liao 已提交
371 372
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
373

H
Haojun Liao 已提交
374
  if (pOperator->exprSupp.pFilterInfo != NULL) {
375
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
376
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
377

378 379
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
380

381 382 383 384 385 386 387
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
388 389
  }

H
Haojun Liao 已提交
390
  applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo, pOperator);
391

H
Haojun Liao 已提交
392
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
393
  pTableScanInfo->limitInfo.numOfOutputRows = pCost->totalRows;
H
Haojun Liao 已提交
394 395 396
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
397
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
398 399 400
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
401
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
402 403
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
404 405
}

406 407
typedef struct STableCachedVal {
  const char* pName;
408
  STag*       pTags;
409 410
} STableCachedVal;

411 412 413 414 415 416 417 418 419 420 421
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
422 423 424 425 426 427 428
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
  pVal->pName = strdup(pMetaReader->me.name);
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
429
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
430 431 432 433 434 435 436
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

437 438
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
439

440 441
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
442
  // currently only the tbname pseudo column
443
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
444
    return TSDB_CODE_SUCCESS;
445 446
  }

447 448
  int32_t code = 0;

449 450 451 452
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

453
  bool            freeReader = false;
454
  STableCachedVal val = {0};
455 456

  SMetaReader mr = {0};
457
  LRUHandle*  h = NULL;
458

459
  // 1. check if it is existed in meta cache
460
  if (pCache == NULL) {
461
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
462
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
463
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
464
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
465 466
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
467
      } else {
S
slzhou 已提交
468 469
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
470
      }
471 472 473 474 475
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
476

477 478
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
479 480

    freeReader = true;
481
  } else {
482 483
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
484
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
485 486
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
487
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
488
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
489
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
490
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
491
                pBlock->info.id.uid, tstrerror(terrno), idStr);
H
Haojun Liao 已提交
492
        } else {
H
Haojun Liao 已提交
493
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
494
                 idStr);
H
Haojun Liao 已提交
495
        }
496 497 498 499 500 501
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
502
      STableCachedVal* pVal = createTableCacheVal(&mr);
503

H
Haojun Liao 已提交
504
      val = *pVal;
505
      freeReader = true;
H
Haojun Liao 已提交
506

H
Haojun Liao 已提交
507
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
508
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
509 510 511 512 513 514 515 516
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
517

H
Haojun Liao 已提交
518
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
519
    }
H
Haojun Liao 已提交
520

521 522
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
523
  }
524

525 526
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
527
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
528 529

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
530
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
531

532
    int32_t functionId = pExpr1->pExpr->_function.functionId;
533 534 535

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
536
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
537
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
538
      STagVal tagVal = {0};
539 540
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
541

542 543 544 545
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
546
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
547
      }
548

H
Haojun Liao 已提交
549 550 551
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
        colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
552
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
H
Haojun Liao 已提交
553
        colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
H
Haojun Liao 已提交
554 555 556
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
L
Liu Jicong 已提交
557
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
558
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
H
Haojun Liao 已提交
559
          colDataAppend(pColInfoData, i, data, false);
H
Haojun Liao 已提交
560
        }
561 562 563 564
      }
    }
  }

565 566
  // restore the rows
  pBlock->info.rows = backupRows;
567 568 569 570
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
571
  return TSDB_CODE_SUCCESS;
572 573
}

H
Haojun Liao 已提交
574
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
575 576 577
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
578
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
579
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
580 581 582
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
583

H
Haojun Liao 已提交
584 585
  colInfoDataEnsureCapacity(&infoData, 1, false);
  colDataAppend(&infoData, 0, buf, false);
586

H
Haojun Liao 已提交
587
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
588
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
589 590 591 592 593 594 595

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
596
  colDataDestroy(&infoData);
597 598
}

599
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
600
  STableScanInfo* pTableScanInfo = pOperator->info;
601
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
602
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
H
Haojun Liao 已提交
603

604 605
  int64_t st = taosGetTimestampUs();

H
Haojun Liao 已提交
606
  while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
607
    if (isTaskKilled(pTaskInfo)) {
608
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
609
    }
H
Haojun Liao 已提交
610

611 612 613 614 615 616
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
617
    ASSERT(pBlock->info.id.uid != 0);
H
Haojun Liao 已提交
618
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
619

620
    uint32_t status = 0;
H
Haojun Liao 已提交
621
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
622 623
    //    int32_t  code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
    if (code != TSDB_CODE_SUCCESS) {
624
      T_LONG_JMP(pOperator->pTaskInfo->env, code);
625
    }
626

627 628 629
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
630
    }
631

H
Haojun Liao 已提交
632 633
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
634

H
Haojun Liao 已提交
635
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
636 637

    // todo refactor
H
Haojun Liao 已提交
638
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
639 640
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
641
    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
642
    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
643

H
Haojun Liao 已提交
644
    ASSERT(pBlock->info.id.uid != 0);
645
    return pBlock;
H
Haojun Liao 已提交
646 647 648 649
  }
  return NULL;
}

H
Haojun Liao 已提交
650
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
651 652 653 654
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
655
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
656 657 658
    return NULL;
  }

659 660
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
661 662 663
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
664 665
    }

666
    pTableScanInfo->scanTimes += 1;
667

668
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
669
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
670
      pTableScanInfo->base.scanFlag = REPEAT_SCAN;
671
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
672

673
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
674
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
675
    }
676
  }
H
Haojun Liao 已提交
677

678
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
679
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
680 681 682
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
683
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
684
    }
H
Haojun Liao 已提交
685

686
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
687 688 689
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
690
      }
H
Haojun Liao 已提交
691

692
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
693

694
      if (pTableScanInfo->scanTimes < total) {
695
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
H
Haojun Liao 已提交
696
        pTableScanInfo->base.scanFlag = REPEAT_SCAN;
H
Haojun Liao 已提交
697

698
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
699
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
700
      }
H
Haojun Liao 已提交
701 702 703
    }
  }

wmmhello's avatar
wmmhello 已提交
704 705 706 707 708 709 710
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

711
  // scan table one by one sequentially
L
Liu Jicong 已提交
712
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
H
Haojun Liao 已提交
713
    int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
H
Haojun Liao 已提交
714

L
Liu Jicong 已提交
715
    while (1) {
H
Haojun Liao 已提交
716
      SSDataBlock* result = doGroupedTableScan(pOperator);
L
Liu Jicong 已提交
717 718 719
      if (result) {
        return result;
      }
H
Haojun Liao 已提交
720

L
Liu Jicong 已提交
721 722
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
H
Haojun Liao 已提交
723
      if (pInfo->currentTable >= numOfTables) {
L
Liu Jicong 已提交
724 725
        return NULL;
      }
H
Haojun Liao 已提交
726

H
Haojun Liao 已提交
727
      STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
H
Haojun Liao 已提交
728
      tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
L
Liu Jicong 已提交
729 730
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
             pInfo->currentTable, pTaskInfo->id.str);
H
Haojun Liao 已提交
731

H
Haojun Liao 已提交
732
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
733 734
      pInfo->scanTimes = 0;
    }
735 736
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
H
Haojun Liao 已提交
737
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
738
        setOperatorCompleted(pOperator);
739 740
        return NULL;
      }
741

5
54liuyao 已提交
742
      int32_t        num = 0;
743
      STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
744
      tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
745
      ASSERT(pInfo->base.dataReader == NULL);
746

L
Liu Jicong 已提交
747 748
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
749 750 751
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
wmmhello's avatar
wmmhello 已提交
752
    }
H
Haojun Liao 已提交
753

H
Haojun Liao 已提交
754
    SSDataBlock* result = doGroupedTableScan(pOperator);
755
    if (result != NULL) {
H
Haojun Liao 已提交
756
      ASSERT(result->info.id.uid != 0);
757 758
      return result;
    }
H
Haojun Liao 已提交
759

H
Haojun Liao 已提交
760
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
H
Haojun Liao 已提交
761
      setOperatorCompleted(pOperator);
762 763
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
764

765 766
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
H
Haojun Liao 已提交
767 768
    pInfo->base.limitInfo.numOfOutputRows = 0;
    pInfo->base.limitInfo.remainOffset = pInfo->base.limitInfo.limit.offset;
wmmhello's avatar
wmmhello 已提交
769

5
54liuyao 已提交
770
    int32_t        num = 0;
771
    STableKeyInfo* pList = NULL;
H
Haojun Liao 已提交
772
    tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
773

H
Haojun Liao 已提交
774 775
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
776
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
777

H
Haojun Liao 已提交
778
    result = doGroupedTableScan(pOperator);
779 780 781
    if (result != NULL) {
      return result;
    }
782

H
Haojun Liao 已提交
783
    setOperatorCompleted(pOperator);
784 785
    return NULL;
  }
H
Haojun Liao 已提交
786 787
}

788 789
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
790
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
791
  *pRecorder = pTableScanInfo->base.readRecorder;
792 793 794 795 796
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

797
static void destroyTableScanOperatorInfo(void* param) {
798
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
H
Haojun Liao 已提交
799
  blockDataDestroy(pTableScanInfo->pResBlock);
H
Haojun Liao 已提交
800
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
H
Haojun Liao 已提交
801

H
Haojun Liao 已提交
802 803
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
804

H
Haojun Liao 已提交
805 806
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
807
  }
L
Liu Jicong 已提交
808

H
Haojun Liao 已提交
809 810
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
D
dapan1121 已提交
811
  taosMemoryFreeClear(param);
812 813
}

814
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
815
                                           SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
816 817 818
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
819
    goto _error;
H
Haojun Liao 已提交
820 821
  }

822
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
823
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
824 825

  int32_t numOfCols = 0;
826
  int32_t code =
H
Haojun Liao 已提交
827
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
828 829 830 831
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
832
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
833
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
834
  if (code != TSDB_CODE_SUCCESS) {
835
    goto _error;
836 837
  }

H
Haojun Liao 已提交
838
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
839
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
840
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
841
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
842 843
  }

844
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
H
Haojun Liao 已提交
845 846

  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
847 848
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
849 850
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

851 852
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
853

H
Haojun Liao 已提交
854
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
855
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
856
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
857

H
Haojun Liao 已提交
858 859 860
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
861 862
  }

wmmhello's avatar
wmmhello 已提交
863
  pInfo->currentGroupId = -1;
864
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
865
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
866

L
Liu Jicong 已提交
867 868
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
869
  pOperator->exprSupp.numOfExprs = numOfCols;
870

H
Haojun Liao 已提交
871 872
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
873 874 875
    code = terrno;
    goto _error;
  }
876

H
Haojun Liao 已提交
877
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
878 879
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
880 881 882

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
883
  return pOperator;
884

885
_error:
886 887 888
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
889

890 891
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
892
  return NULL;
H
Haojun Liao 已提交
893 894
}

895
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
896
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
897
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
898

H
Haojun Liao 已提交
899
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
900
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
901

L
Liu Jicong 已提交
902 903
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
904
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
905 906 907
  return pOperator;
}

908
static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
L
Liu Jicong 已提交
909 910
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
911 912
}

913
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
914
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
915 916
}

917
static bool isStateWindow(SStreamScanInfo* pInfo) {
918
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
919
}
5
54liuyao 已提交
920

L
Liu Jicong 已提交
921
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
922 923 924
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
925 926 927
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
928
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
929 930
}

931 932 933 934
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

935
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
936 937
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
938
  ASSERT(rowIndex < pBlock->info.rows);
939
  pInfo->groupId = groupCol[rowIndex];
940 941
}

L
Liu Jicong 已提交
942
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
H
Haojun Liao 已提交
943
  pTableScanInfo->base.cond.twindows = *pWin;
L
Liu Jicong 已提交
944 945
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
946 947
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
948 949
}

L
Liu Jicong 已提交
950 951
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
952
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
953

954
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
955
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
956 957 958 959 960 961 962 963 964

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
965 966
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
967 968
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
969
    T_LONG_JMP(pTaskInfo->env, code);
970 971 972
    return NULL;
  }

H
Haojun Liao 已提交
973
  if (tsdbNextDataBlock(pReader)) {
L
Liu Jicong 已提交
974
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
975
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
H
Haojun Liao 已提交
976
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
977 978 979 980
  }

  tsdbReaderClose(pReader);
  qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
981 982
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
983 984

  return pBlock->info.rows > 0 ? pBlock : NULL;
985 986 987 988 989 990 991 992 993 994 995
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
996
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
H
Haojun Liao 已提交
997
  return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
998 999
}

5
54liuyao 已提交
1000 1001 1002 1003 1004 1005 1006 1007
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1008
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1009 1010 1011
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1022 1023 1024
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1025 1026 1027 1028 1029 1030

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1031
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1032 1033 1034 1035
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1036 1037 1038
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1039
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1040 1041 1042
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1043
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1044 1045 1046
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1047 1048
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1049 1050 1051 1052
    break;
  }

  resetTableScanInfo(pInfo->pTableScanOp->info, &win);
1053
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1054 1055 1056
  return true;
}

5
54liuyao 已提交
1057
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1058
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1059
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1060
  dumyInfo.cur.pageId = -1;
1061
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1062 1063
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1064
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1065

5
54liuyao 已提交
1066
  while (1) {
1067 1068 1069
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1070
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1071 1072 1073 1074 1075
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1076
    }
5
54liuyao 已提交
1077

5
54liuyao 已提交
1078 1079 1080
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1081
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1082
    endWin = preWin;
5
54liuyao 已提交
1083
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1084 1085 1086 1087 1088 1089
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1090

L
Liu Jicong 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
    if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1102
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1103
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1104 1105
      tsdbReaderClose(pTableScanInfo->base.dataReader);
      pTableScanInfo->base.dataReader = NULL;
1106 1107
      return NULL;
    }
L
Liu Jicong 已提交
1108

H
Haojun Liao 已提交
1109
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1110 1111 1112 1113
    if (pResult->info.rows == 0) {
      continue;
    }

1114 1115 1116 1117 1118 1119 1120 1121
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1122 1123
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1124 1125 1126 1127 1128
            colDataAppend(pDestCol, pResult->info.rows, pSrcData, isNull);
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1129 1130 1131

      blockDataDestroy(tmpBlock);

1132 1133 1134 1135
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1136
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1137
      pResult->info.calWin = pInfo->updateWin;
1138
      return pResult;
5
54liuyao 已提交
1139 1140
    }
  }
1141
}
1142

1143
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1144
  blockDataCleanup(pDestBlock);
1145 1146
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1147
  }
1148
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1149
  if (code != TSDB_CODE_SUCCESS) {
1150
    return code;
L
Liu Jicong 已提交
1151
  }
1152 1153
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1154
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1155
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1156
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1157 1158
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1159

1160 1161
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1162
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1163
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1164 1165
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1166
  int64_t          version = pSrcBlock->info.version - 1;
1167
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1168
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1169
    // gap must be 0.
5
54liuyao 已提交
1170
    SSessionKey startWin = {0};
1171
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1172
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1173 1174 1175
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1176 1177 1178 1179 1180 1181
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    ASSERT(!IS_INVALID_SESSION_WIN_KEY(endWin));
    colDataAppend(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataAppend(pDestEndCol, i, (const char*)&endWin.win.ekey, false);

5
54liuyao 已提交
1182
    colDataAppendNULL(pDestUidCol, i);
L
Liu Jicong 已提交
1183
    colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1184 1185
    colDataAppendNULL(pDestCalStartTsCol, i);
    colDataAppendNULL(pDestCalEndTsCol, i);
1186
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1187
  }
1188
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1189
}
1190 1191 1192 1193 1194 1195

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1196
  }
1197

1198 1199
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1200 1201
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1202

L
Liu Jicong 已提交
1203
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1204
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1241 1242
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1243
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1244 1245 1246
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1247
  for (int32_t i = 0; i < rows;) {
1248
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1249 1250 1251 1252 1253
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1254
    colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1255
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1256 1257
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1258 1259
    colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataAppend(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
1260 1261 1262 1263
    colDataAppend(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataAppend(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataAppend(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
    pDestBlock->info.rows++;
5
54liuyao 已提交
1264
  }
1265 1266
  return TSDB_CODE_SUCCESS;
}
1267

1268
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1269 1270 1271
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1272 1273
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1274
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1275 1276 1277 1278
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1289
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1290 1291
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1292
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1293 1294 1295
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1296
    if (pInfo->tbnameCalSup.pExprInfo) {
1297 1298 1299
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1300 1301
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
L
Liu Jicong 已提交
1302
      tdbFree(parTbname);
L
Liu Jicong 已提交
1303 1304 1305
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1306 1307 1308 1309
  }
  return TSDB_CODE_SUCCESS;
}

1310 1311 1312 1313
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1314
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1315
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1316 1317
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1318
  }
1319
  pDestBlock->info.type = STREAM_CLEAR;
1320
  pDestBlock->info.version = pSrcBlock->info.version;
1321
  pDestBlock->info.dataLoad = 1;
1322 1323 1324 1325
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

L
Liu Jicong 已提交
1326
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1327 1328
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
  SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
L
Liu Jicong 已提交
1329 1330
  if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
  if (pBlock == NULL || pBlock->info.rows == 0) return;
1331 1332

  void* tbname = NULL;
H
Haojun Liao 已提交
1333
  if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
1334 1335 1336
    pBlock->info.parTbName[0] = 0;
  } else {
    memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1337
  }
1338
  tdbFree(tbname);
L
Liu Jicong 已提交
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357

  SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
  ASSERT(pSrcBlock->info.rows == 1);

  SSDataBlock* pResBlock = createDataBlock();
  pResBlock->info.rowSize = VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN;
  SColumnInfoData data = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, TSDB_TABLE_NAME_LEN, 0);
  taosArrayPush(pResBlock->pDataBlock, &data);
  blockDataEnsureCapacity(pResBlock, 1);

  projectApplyFunctions(pTbNameCalSup->pExprInfo, pResBlock, pSrcBlock, pTbNameCalSup->pCtx, 1, NULL);
  ASSERT(pResBlock->info.rows == 1);
  ASSERT(taosArrayGetSize(pResBlock->pDataBlock) == 1);
  SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, 0);
  ASSERT(pCol->info.type == TSDB_DATA_TYPE_VARCHAR);

  void* pData = colDataGetData(pCol, 0);
  // TODO check tbname validation
  if (pData != (void*)-1 && pData != NULL) {
1358
    memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
L
Liu Jicong 已提交
1359
    int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
1360 1361
    memcpy(pBlock->info.parTbName, varDataVal(pData), len);
    /*pBlock->info.parTbName[len + 1] = 0;*/
L
Liu Jicong 已提交
1362 1363 1364 1365
  } else {
    pBlock->info.parTbName[0] = 0;
  }

H
Haojun Liao 已提交
1366 1367
  if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
    streamStatePutParName(pState, pBlock->info.id.groupId, pBlock->info.parTbName);
L
Liu Jicong 已提交
1368 1369
  }

L
Liu Jicong 已提交
1370 1371 1372 1373
  blockDataDestroy(pSrcBlock);
  blockDataDestroy(pResBlock);
}

1374 1375
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1376 1377
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1378 1379
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1380 1381
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1382
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1383 1384
  colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
1385 1386
  colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataAppend(pGpCol, pBlock->info.rows, (const char*)pGp, false);
1387 1388
  colDataAppend(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataAppend(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
1389
  colDataAppend(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1390
  pBlock->info.rows++;
5
54liuyao 已提交
1391 1392
}

1393
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1394 1395
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1396
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1397
  }
1398 1399
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1400
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1401
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1402
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1403 1404
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1405
    bool        isClosed = false;
5
54liuyao 已提交
1406
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
L
Liu Jicong 已提交
1407
    if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) {
5
54liuyao 已提交
1408 1409 1410
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1411
    // must check update info first.
H
Haojun Liao 已提交
1412
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1413
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1414
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
1415
                                           pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1416
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1417
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1418
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1419
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1420
                                       NULL);
5
54liuyao 已提交
1421 1422
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1423 1424
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1425
      }
1426 1427
    }
  }
1428 1429
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1430
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1431
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1432
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1433 1434
  }
}
L
Liu Jicong 已提交
1435

1436
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1437 1438
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1439
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1440

1441 1442
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1443
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1444
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1445
  pInfo->pRes->info.type = STREAM_NORMAL;
1446
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1447

H
Haojun Liao 已提交
1448
  pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
L
Liu Jicong 已提交
1449 1450

  // todo extract method
H
Haojun Liao 已提交
1451 1452 1453
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1454 1455 1456 1457 1458 1459 1460
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1461
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1462
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1463 1464 1465 1466 1467 1468 1469
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1470
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
L
Liu Jicong 已提交
1471 1472 1473 1474 1475 1476
      colDataAppendNNULL(pDst, 0, pBlockInfo->rows);
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1477
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1478
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1479 1480
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1481
      blockDataFreeRes((SSDataBlock*)pBlock);
1482
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1483
    }
K
kailixu 已提交
1484 1485 1486

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1487 1488
  }

1489
  if (filter) {
H
Haojun Liao 已提交
1490
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1491
  }
1492

1493
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1494
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1495
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1496

L
Liu Jicong 已提交
1497
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1498 1499
  return 0;
}
5
54liuyao 已提交
1500

L
Liu Jicong 已提交
1501
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1502 1503
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
H
Haojun Liao 已提交
1504

L
Liu Jicong 已提交
1505
  qDebug("queue scan called");
L
Liu Jicong 已提交
1506 1507 1508 1509 1510 1511 1512 1513

  if (pTaskInfo->streamInfo.pReq != NULL) {
    if (pInfo->tqReader->pMsg == NULL) {
      pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
      const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
      if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
        qError("submit msg messed up when initing stream submit block %p", pSubmit);
        pInfo->tqReader->pMsg = NULL;
L
Liu Jicong 已提交
1514
        pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
        ASSERT(0);
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

    while (tqNextDataBlock(pInfo->tqReader)) {
      SSDataBlock block = {0};

      int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1531
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1532 1533 1534 1535 1536 1537 1538 1539

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

    pInfo->tqReader->pMsg = NULL;
    pTaskInfo->streamInfo.pReq = NULL;
L
Liu Jicong 已提交
1540
    return NULL;
L
Liu Jicong 已提交
1541 1542
  }

L
Liu Jicong 已提交
1543 1544 1545
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
L
Liu Jicong 已提交
1546
      qDebug("queue scan tsdb return %d rows", pResult->info.rows);
1547
      pTaskInfo->streamInfo.returned = 1;
L
Liu Jicong 已提交
1548 1549
      return pResult;
    } else {
1550 1551
      if (!pTaskInfo->streamInfo.returned) {
        STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1552 1553
        tsdbReaderClose(pTSInfo->base.dataReader);
        pTSInfo->base.dataReader = NULL;
1554
        tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
1555
        qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1556
        if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
1557
          tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
1558 1559 1560 1561
          return NULL;
        }
        ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
      } else {
L
Liu Jicong 已提交
1562 1563
        return NULL;
      }
1564 1565 1566
    }
  }

L
Liu Jicong 已提交
1567 1568 1569 1570 1571 1572
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
    while (1) {
      SFetchRet ret = {0};
      tqNextBlock(pInfo->tqReader, &ret);
      if (ret.fetchType == FETCH_TYPE__DATA) {
        blockDataCleanup(pInfo->pRes);
1573
        if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
L
Liu Jicong 已提交
1574 1575 1576
          ASSERT(0);
        }
        if (pInfo->pRes->info.rows > 0) {
L
Liu Jicong 已提交
1577
          pOperator->status = OP_EXEC_RECV;
L
Liu Jicong 已提交
1578
          qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
L
Liu Jicong 已提交
1579 1580 1581 1582
          return pInfo->pRes;
        }
      } else if (ret.fetchType == FETCH_TYPE__META) {
        ASSERT(0);
L
Liu Jicong 已提交
1583 1584 1585
        //        pTaskInfo->streamInfo.lastStatus = ret.offset;
        //        pTaskInfo->streamInfo.metaBlk = ret.meta;
        //        return NULL;
L
Liu Jicong 已提交
1586 1587
      } else if (ret.fetchType == FETCH_TYPE__NONE ||
                 (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) {
L
Liu Jicong 已提交
1588
        pTaskInfo->streamInfo.lastStatus = ret.offset;
1589 1590 1591 1592
        ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
        ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
        char formatBuf[80];
        tFormatOffset(formatBuf, 80, &ret.offset);
L
Liu Jicong 已提交
1593
        qDebug("queue scan log return null, offset %s", formatBuf);
L
Liu Jicong 已提交
1594
        pOperator->status = OP_OPENED;
L
Liu Jicong 已提交
1595 1596 1597
        return NULL;
      }
    }
L
Liu Jicong 已提交
1598
#if 0
1599
    } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1600
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
L
Liu Jicong 已提交
1601 1602 1603 1604 1605 1606
    if (pResult && pResult->info.rows > 0) {
      qDebug("stream scan tsdb return %d rows", pResult->info.rows);
      return pResult;
    }
    qDebug("stream scan tsdb return null");
    return NULL;
L
Liu Jicong 已提交
1607
#endif
L
Liu Jicong 已提交
1608 1609 1610
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1611
  }
L
Liu Jicong 已提交
1612 1613
}

L
Liu Jicong 已提交
1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
      colDataAppend(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataAppend(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataAppend(pDstUidCol, j, (const char*)&uidCol[i], false);

      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataAppendNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
      j++;
    }
  }
L
Liu Jicong 已提交
1642
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1643 1644
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1645
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1646 1647 1648 1649

  return 0;
}

5
54liuyao 已提交
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
      colDataAppend(pGpCol, i, (const char*)&groupId, false);
    }
  }
}

L
Liu Jicong 已提交
1667 1668 1669 1670 1671
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1672
  qDebug("stream scan called");
L
Liu Jicong 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705
#if 0
  SStreamState* pState = pTaskInfo->streamInfo.pState;
  if (pState) {
    printf(">>>>>>>> stream write backend\n");
    SWinKey key = {
        .ts = 1,
        .groupId = 2,
    };
    char tmp[100] = "abcdefg1";
    if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
      ASSERT(0);
    }

    key.ts = 2;
    char tmp2[100] = "abcdefg2";
    if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
      ASSERT(0);
    }

    key.groupId = 5;
    key.ts = 1;
    char tmp3[100] = "abcdefg3";
    if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
      ASSERT(0);
    }

    char*   val2 = NULL;
    int32_t sz;
    if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
      ASSERT(0);
    }
    printf("stream read %s %d\n", val2, sz);
    streamFreeVal(val2);
H
Haojun Liao 已提交
1706
  }
L
Liu Jicong 已提交
1707
#endif
H
Haojun Liao 已提交
1708

1709 1710
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1711
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1712
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1713
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1714 1715 1716 1717
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1718
    } else {
H
Haojun Liao 已提交
1719 1720 1721 1722
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
1723
    }
L
Liu Jicong 已提交
1724 1725

    /*resetTableScanInfo(pTSInfo, pWin);*/
H
Haojun Liao 已提交
1726 1727
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1728

L
Liu Jicong 已提交
1729 1730 1731
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
L
Liu Jicong 已提交
1732
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1733 1734 1735
  }

  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
L
Liu Jicong 已提交
1736 1737 1738 1739 1740
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
L
Liu Jicong 已提交
1741 1742
    SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
    if (pBlock != NULL) {
L
Liu Jicong 已提交
1743
      pInfo->blockRecoverContiCnt++;
L
Liu Jicong 已提交
1744
      calBlockTbName(pInfo, pBlock);
1745
      if (pInfo->pUpdateInfo) {
L
Liu Jicong 已提交
1746 1747
        TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
        pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
1748
      }
1749
      qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
L
Liu Jicong 已提交
1750
      printDataBlock(pBlock, "scan recover");
L
Liu Jicong 已提交
1751 1752 1753
      return pBlock;
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1754
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1755 1756
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
1757

H
Haojun Liao 已提交
1758 1759
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1760

L
Liu Jicong 已提交
1761
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1762 1763 1764
    return NULL;
  }

5
54liuyao 已提交
1765
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1766
// TODO: refactor
L
Liu Jicong 已提交
1767
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1768
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1769
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1770
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1771
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1772 1773 1774
      return NULL;
    }

1775
    int32_t      current = pInfo->validBlockIndex++;
1776
    SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
H
Haojun Liao 已提交
1777 1778
    if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) {
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1779
    }
1780
    // TODO move into scan
5
54liuyao 已提交
1781 1782
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1783
    pBlock->info.dataLoad = 1;
1784
    blockDataUpdateTsWindow(pBlock, 0);
1785
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1786 1787 1788
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1789 1790 1791
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1792 1793
        copyDataBlock(pInfo->pUpdateRes, pBlock);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1794 1795 1796
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1797
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1798
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1799
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1800
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1801
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1802 1803
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1804
        }
5
54liuyao 已提交
1805 1806
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1807 1808 1809 1810 1811 1812
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1813
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1814
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1815
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1816
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1817 1818
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1819 1820 1821 1822 1823
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1824 1825 1826
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1827
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1828 1829 1830
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1831 1832 1833 1834
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1835
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1836
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1837 1838 1839 1840
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1841
        }
1842 1843 1844
      } break;
      default:
        break;
5
54liuyao 已提交
1845
    }
1846
    // printDataBlock(pBlock, "stream scan recv");
1847
    return pBlock;
L
Liu Jicong 已提交
1848
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1849
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1850 1851 1852 1853 1854 1855
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        blockDataDestroy(pInfo->pUpdateRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        return pInfo->pRes;
      } break;
1856
      case STREAM_SCAN_FROM_DELETE_DATA: {
1857 1858 1859 1860 1861 1862 1863
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1864 1865 1866 1867 1868 1869 1870 1871 1872 1873
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
1874
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1875 1876
          uint64_t        version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
          updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
5
54liuyao 已提交
1877 1878
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1879
          // printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
1880
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
1881 1882
          return pSDB;
        }
1883
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1884 1885 1886 1887
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
1888
    }
1889

1890
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
1891
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
1892 1893
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
1894 1895
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
1896 1897
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      return pInfo->pUpdateRes;
5
54liuyao 已提交
1898
    }
5
54liuyao 已提交
1899

H
Haojun Liao 已提交
1900 1901
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1902
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
1903

L
Liu Jicong 已提交
1904
  NEXT_SUBMIT_BLK:
1905 1906 1907
    while (1) {
      if (pInfo->tqReader->pMsg == NULL) {
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
1908
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
1909
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1910
          qDebug("stream scan return empty, consume block %d", totBlockNum);
1911 1912
          return NULL;
        }
1913

1914 1915 1916 1917 1918 1919 1920 1921
        int32_t     current = pInfo->validBlockIndex++;
        SSubmitReq* pSubmit = taosArrayGetP(pInfo->pBlockLists, current);
        if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          pInfo->tqReader->pMsg = NULL;
          continue;
        }
H
Haojun Liao 已提交
1922 1923
      }

1924 1925 1926 1927
      blockDataCleanup(pInfo->pRes);

      while (tqNextDataBlock(pInfo->tqReader)) {
        SSDataBlock block = {0};
1928

1929 1930 1931 1932 1933 1934
        int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

1935
        setBlockIntoRes(pInfo, &block, false);
1936

H
Haojun Liao 已提交
1937
        if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
L
Liu Jicong 已提交
1938
                             pInfo->pRes->info.version)) {
1939 1940 1941 1942 1943
          printDataBlock(pInfo->pRes, "stream scan ignore");
          blockDataCleanup(pInfo->pRes);
          continue;
        }

1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
        if (pInfo->pUpdateInfo) {
          checkUpdateData(pInfo, true, pInfo->pRes, true);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
          if (pInfo->pUpdateDataRes->info.rows > 0) {
            pInfo->updateResIndex = 0;
            if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
              pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
              pInfo->scanMode = STREAM_SCAN_FROM_RES;
              return pInfo->pUpdateDataRes;
            } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
              pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
            }
          }
        }

H
Haojun Liao 已提交
1960
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1961
        pInfo->pRes->info.dataLoad = 1;
1962 1963 1964
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
1965 1966 1967
          break;
        }
      }
1968
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
1969
        break;
J
jiacy-jcy 已提交
1970 1971
      } else {
        pInfo->tqReader->pMsg = NULL;
1972
        continue;
5
54liuyao 已提交
1973
      }
1974
      /*blockDataCleanup(pInfo->pRes);*/
H
Haojun Liao 已提交
1975 1976 1977 1978
    }

    // record the scan action.
    pInfo->numOfExec++;
1979
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
1980
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
1981

L
Liu Jicong 已提交
1982
    qDebug("scan rows: %d", pBlockInfo->rows);
L
Liu Jicong 已提交
1983 1984 1985
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
1986 1987 1988 1989 1990 1991

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
1992 1993 1994
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
1995 1996 1997
  }
}

H
Haojun Liao 已提交
1998
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
1999 2000 2001
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2002 2003 2004
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2005 2006 2007 2008 2009 2010
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2011
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2012 2013
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2014
  SStreamRawScanInfo* pInfo = pOperator->info;
L
Liu Jicong 已提交
2015
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2016
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2017

wmmhello's avatar
wmmhello 已提交
2018
  qDebug("tmqsnap doRawScan called");
L
Liu Jicong 已提交
2019
  if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
2020
    if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
wmmhello's avatar
wmmhello 已提交
2021
      if (isTaskKilled(pTaskInfo)) {
2022
        longjmp(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2023
      }
2024

H
Haojun Liao 已提交
2025 2026
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
wmmhello's avatar
wmmhello 已提交
2027
        longjmp(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2028 2029
      }

H
Haojun Liao 已提交
2030
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
wmmhello's avatar
wmmhello 已提交
2031
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
H
Haojun Liao 已提交
2032
      pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
wmmhello's avatar
wmmhello 已提交
2033 2034 2035
      pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2036 2037

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
L
Liu Jicong 已提交
2038
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2039 2040
      qDebug("tmqsnap read snapshot done, change to get data from wal");
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
wmmhello's avatar
wmmhello 已提交
2041 2042
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
      pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
L
Liu Jicong 已提交
2043
    } else {
wmmhello's avatar
wmmhello 已提交
2044 2045
      pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
      pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
2046
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2047 2048
      qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
    }
2049
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2050
    qDebug("tmqsnap stream scan tsdb return null");
wmmhello's avatar
wmmhello 已提交
2051
    return NULL;
L
Liu Jicong 已提交
2052 2053 2054 2055 2056 2057 2058
  } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2059
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2060
      taosMemoryFreeClear(data);
2061 2062 2063
      return NULL;
    }

L
Liu Jicong 已提交
2064
    if (!sContext->queryMetaOrData) {  // change to get data next poll request
wmmhello's avatar
wmmhello 已提交
2065 2066 2067 2068
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
      pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
wmmhello's avatar
wmmhello 已提交
2069
      pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
L
Liu Jicong 已提交
2070
    } else {
wmmhello's avatar
wmmhello 已提交
2071 2072 2073 2074 2075 2076 2077
      pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
      pTaskInfo->streamInfo.lastStatus.uid = uid;
      pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2078

wmmhello's avatar
wmmhello 已提交
2079
    return NULL;
2080
  }
L
Liu Jicong 已提交
2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2119 2120 2121
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2122
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2123 2124 2125 2126 2127 2128
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2129 2130 2131
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2132
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2133 2134 2135 2136 2137
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2138 2139
  int32_t code = TSDB_CODE_SUCCESS;

2140
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2141
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2142
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2143 2144
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2145 2146
  }

wmmhello's avatar
wmmhello 已提交
2147 2148
  pInfo->vnode = pHandle->vnode;

2149
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2150 2151
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2152

2153
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2154
  return pOperator;
H
Haojun Liao 已提交
2155

L
Liu Jicong 已提交
2156
_end:
H
Haojun Liao 已提交
2157 2158 2159 2160
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2161 2162
}

2163
static void destroyStreamScanOperatorInfo(void* param) {
2164 2165
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2166
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2167 2168 2169 2170
  }
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2171 2172
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2173
  }
C
Cary Xu 已提交
2174 2175
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2176
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2177
  }
C
Cary Xu 已提交
2178

L
Liu Jicong 已提交
2179 2180
  cleanupExprSupp(&pStreamScan->tbnameCalSup);

L
Liu Jicong 已提交
2181
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2182 2183 2184 2185
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2186
  blockDataDestroy(pStreamScan->pUpdateDataRes);
2187 2188 2189 2190
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2191
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2192
                                            SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2193
  SArray*          pColIds = NULL;
2194 2195
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2196

H
Haojun Liao 已提交
2197
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2198
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2199
    goto _error;
H
Haojun Liao 已提交
2200 2201
  }

2202
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2203
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2204

2205
  pInfo->pTagCond = pTagCond;
2206
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2207

2208
  int32_t numOfCols = 0;
2209 2210
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2211 2212 2213
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2214

H
Haojun Liao 已提交
2215
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2216
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2217
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2218
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2219 2220

    int16_t colId = id->colId;
2221
    taosArrayPush(pColIds, &colId);
2222
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2223
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2224
    }
H
Haojun Liao 已提交
2225 2226
  }

L
Liu Jicong 已提交
2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
      goto _error;
    }
  }

2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
    SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _error;
    }
  }

H
Haojun Liao 已提交
2253 2254
  pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
  if (pInfo->pBlockLists == NULL) {
2255 2256
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _error;
H
Haojun Liao 已提交
2257 2258
  }

5
54liuyao 已提交
2259
  if (pHandle->vnode) {
L
Liu Jicong 已提交
2260
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
L
Liu Jicong 已提交
2261
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2262
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2263
      pTSInfo->base.cond.endVersion = pHandle->version;
2264
    }
L
Liu Jicong 已提交
2265

2266
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2267
    int32_t        num = 0;
H
Haojun Liao 已提交
2268
    tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
2269

2270
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2271
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2272
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2273 2274
      code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
                            &pTSInfo->base.dataReader, NULL);
dengyihao's avatar
dengyihao 已提交
2275 2276
      if (code != 0) {
        terrno = code;
H
Haojun Liao 已提交
2277
        destroyTableScanOperatorInfo(pTableScanOp);
2278
        goto _error;
L
Liu Jicong 已提交
2279
      }
L
Liu Jicong 已提交
2280 2281
    }

L
Liu Jicong 已提交
2282 2283 2284 2285
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2286
    } else {
L
Liu Jicong 已提交
2287 2288
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2289 2290
    }

2291
    pInfo->pUpdateInfo = NULL;
2292
    pInfo->pTableScanOp = pTableScanOp;
2293 2294 2295
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2296

L
Liu Jicong 已提交
2297 2298
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2299
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
L
Liu Jicong 已提交
2300

L
Liu Jicong 已提交
2301
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2302
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
H
Haojun Liao 已提交
2303
    SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
2304
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2305 2306 2307 2308 2309
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2310
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2311 2312
  } else {
    taosArrayDestroy(pColIds);
H
Haojun Liao 已提交
2313
    pColIds = NULL;
5
54liuyao 已提交
2314 2315
  }

2316 2317 2318 2319 2320
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2321 2322 2323 2324 2325
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2326
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2327
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2328
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2329
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2330
  pInfo->groupId = 0;
2331
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2332
  pInfo->pStreamScanOp = pOperator;
2333
  pInfo->deleteDataIndex = 0;
2334
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2335
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2336
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2337
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2338
  pInfo->partitionSup.needCalc = false;
L
Liu Jicong 已提交
2339

L
Liu Jicong 已提交
2340 2341
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2342
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2343

L
Liu Jicong 已提交
2344
  __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2345 2346
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2347

H
Haojun Liao 已提交
2348
  return pOperator;
2349

L
Liu Jicong 已提交
2350
_error:
H
Haojun Liao 已提交
2351 2352 2353 2354 2355 2356 2357 2358
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2359 2360
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2361 2362
}

2363
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2364 2365 2366 2367
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2368 2369 2370
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2371
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2372
  SSDataBlock*  pRes = pInfo->pRes;
2373
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2374

H
Haojun Liao 已提交
2375
  int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
wmmhello's avatar
wmmhello 已提交
2376
  if (size == 0) {
H
Haojun Liao 已提交
2377 2378 2379 2380
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2381 2382 2383
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2384
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2385

wmmhello's avatar
wmmhello 已提交
2386
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
H
Haojun Liao 已提交
2387
    STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos);
L
Liu Jicong 已提交
2388
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2389
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2390
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2391 2392
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2393
      metaReaderClear(&mr);
2394
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2395
    }
H
Haojun Liao 已提交
2396

2397
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2398 2399 2400 2401 2402 2403
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
        colDataAppend(pDst, count, str, false);
2404
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2405 2406
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2407
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2408

2409 2410 2411 2412
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2413 2414
          data = (char*)p;
        }
L
Liu Jicong 已提交
2415 2416
        colDataAppend(pDst, count, data,
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2417

2418 2419
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2420
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2421
        }
H
Haojun Liao 已提交
2422 2423 2424
      }
    }

2425
    count += 1;
wmmhello's avatar
wmmhello 已提交
2426
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2427
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2428 2429 2430
    }
  }

2431 2432
  metaReaderClear(&mr);

2433
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2434
  if (pOperator->status == OP_EXEC_DONE) {
2435
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2436 2437 2438
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2439
  pOperator->resultInfo.totalRows += count;
2440

2441
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2442 2443
}

2444
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2445 2446
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2447
  taosArrayDestroy(pInfo->matchInfo.pList);
D
dapan1121 已提交
2448
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2449 2450
}

S
slzhou 已提交
2451 2452
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
                                         SExecTaskInfo* pTaskInfo) {
2453
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2454 2455 2456 2457 2458
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2459 2460 2461 2462
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2463
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2464 2465 2466
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2467

H
Haojun Liao 已提交
2468 2469
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2470 2471 2472
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2473

H
Haojun Liao 已提交
2474
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2475 2476
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2477

L
Liu Jicong 已提交
2478 2479
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2480
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2481 2482
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2483 2484
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2485 2486

  return pOperator;
2487

2488
_error:
H
Haojun Liao 已提交
2489 2490 2491 2492 2493
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2494

dengyihao's avatar
dengyihao 已提交
2495
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2496 2497 2498 2499 2500 2501 2502
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;

H
Haojun Liao 已提交
2503
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2504

L
Liu Jicong 已提交
2505 2506
  int64_t      st = taosGetTimestampUs();
  void*        p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2507
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2508

L
Liu Jicong 已提交
2509 2510
  int32_t code =
      tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
dengyihao's avatar
dengyihao 已提交
2511
  if (code != 0) {
H
Haojun Liao 已提交
2512
    T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
dengyihao 已提交
2513
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2514

H
Haojun Liao 已提交
2515
  STsdbReader* reader = pInfo->base.dataReader;
dengyihao's avatar
opt mem  
dengyihao 已提交
2516
  while (tsdbNextDataBlock(reader)) {
H
Haojun Liao 已提交
2517
    if (isTaskKilled(pTaskInfo)) {
2518
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2519 2520 2521
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2522
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2523 2524 2525 2526
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2527
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2528 2529 2530 2531
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2532 2533

    uint32_t status = 0;
H
Haojun Liao 已提交
2534
    loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2535
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2536
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2537
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2538 2539 2540 2541 2542 2543 2544
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

H
Haojun Liao 已提交
2545
    pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2546

H
Haojun Liao 已提交
2547
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2548
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2549

H
Haojun Liao 已提交
2550 2551
    tsdbReaderClose(pInfo->base.dataReader);
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2552 2553
    return pBlock;
  }
H
Haojun Liao 已提交
2554

H
Haojun Liao 已提交
2555 2556
  tsdbReaderClose(pInfo->base.dataReader);
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2557 2558 2559
  return NULL;
}

2560 2561 2562
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2563
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2564
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2565
      tsTargetSlotId = colInfo->dstSlotId;
2566 2567 2568
    }
  }

2569 2570 2571
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2572
  bi.slotId = tsTargetSlotId;
2573 2574 2575 2576 2577 2578 2579
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2580
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2581 2582 2583 2584 2585 2586 2587
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2588

2589
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2590 2591 2592
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2593
  {
H
Haojun Liao 已提交
2594
    size_t  numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2595
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2596
    for (; i < numOfTables; ++i) {
H
Haojun Liao 已提交
2597
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
S
slzhou 已提交
2598 2599 2600 2601 2602 2603
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2604

S
slzhou 已提交
2605 2606
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2607

H
Haojun Liao 已提交
2608
  pInfo->base.dataReader = NULL;
2609

2610 2611
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2612
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2613
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2614 2615
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2616

dengyihao's avatar
dengyihao 已提交
2617
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2618 2619 2620 2621 2622 2623

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2624 2625 2626 2627
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2628 2629
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2630
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2631 2632

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2633
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2634
    taosArrayPush(pInfo->queryConds, &cond);
2635 2636
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2637
  for (int32_t i = 0; i < numOfTable; ++i) {
2638
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2639
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2640
    ps->param = param;
2641
    ps->onlyRef = true;
2642 2643 2644 2645 2646 2647
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2648
    T_LONG_JMP(pTaskInfo->env, terrno);
2649 2650
  }

2651 2652 2653 2654 2655 2656 2657
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2658
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2659

2660 2661 2662 2663 2664 2665 2666
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2667
  for (int32_t i = 0; i < numOfTable; ++i) {
2668 2669 2670
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
  }
2671 2672
  taosArrayClear(pInfo->sortSourceParams);

2673
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2674
  pInfo->pSortHandle = NULL;
2675

dengyihao's avatar
opt mem  
dengyihao 已提交
2676 2677 2678
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2679
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2680 2681 2682
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2683 2684 2685
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
2686 2687
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2688 2689 2690
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2691
  blockDataCleanup(pResBlock);
2692 2693

  while (1) {
2694
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2695 2696 2697 2698
    if (pTupleHandle == NULL) {
      break;
    }

2699 2700
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2701 2702 2703 2704
      break;
    }
  }

2705
  qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows);
2706 2707 2708
  applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo, pOperator);
  pInfo->limitInfo.numOfOutputRows += pResBlock->info.rows;

2709
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2722
    T_LONG_JMP(pTaskInfo->env, code);
2723
  }
2724

H
Haojun Liao 已提交
2725
  size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList);
S
slzhou 已提交
2726 2727
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2728

S
slzhou 已提交
2729
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2730
      setOperatorCompleted(pOperator);
2731 2732
      return NULL;
    }
S
slzhou 已提交
2733
    pInfo->tableStartIndex = 0;
H
Haojun Liao 已提交
2734
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId;
2735 2736
    startGroupTableMergeScan(pOperator);
  }
2737

S
slzhou 已提交
2738 2739
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
L
Liu Jicong 已提交
2740 2741
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2742
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2743
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2744 2745 2746 2747 2748
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2749
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2750 2751 2752
        break;
      }
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
H
Haojun Liao 已提交
2753
      pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2754 2755
      startGroupTableMergeScan(pOperator);
    }
wmmhello's avatar
wmmhello 已提交
2756 2757
  }

2758 2759 2760
  return pBlock;
}

2761
void destroyTableMergeScanOperatorInfo(void* param) {
2762
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2763
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2764

dengyihao's avatar
dengyihao 已提交
2765 2766 2767
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2768 2769
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2770
  }
H
Haojun Liao 已提交
2771

2772
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2773 2774
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2775

H
Haojun Liao 已提交
2776 2777
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2778

dengyihao's avatar
opt mem  
dengyihao 已提交
2779 2780 2781
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2782
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2783
  taosArrayDestroy(pTableScanInfo->queryConds);
2784

H
Haojun Liao 已提交
2785 2786
  if (pTableScanInfo->base.matchInfo.pList != NULL) {
    taosArrayDestroy(pTableScanInfo->base.matchInfo.pList);
2787 2788 2789 2790 2791 2792
  }

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
H
Haojun Liao 已提交
2793
  cleanupExprSupp(&pTableScanInfo->base.pseudoSup);
L
Liu Jicong 已提交
2794

H
Haojun Liao 已提交
2795 2796 2797 2798
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;
  taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache);

D
dapan1121 已提交
2799
  taosMemoryFreeClear(param);
2800 2801 2802 2803
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2804 2805
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2806
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2807
  execInfo->blockRecorder = pInfo->base.readRecorder;
2808
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2809 2810 2811

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2812

2813 2814 2815
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
2816 2817
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
                                                SExecTaskInfo* pTaskInfo) {
2818 2819 2820 2821 2822
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
2823

2824 2825 2826
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
2827
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
2828
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
2829 2830 2831
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2832

H
Haojun Liao 已提交
2833
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
2834
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2835
    taosArrayDestroy(pInfo->base.matchInfo.pList);
2836 2837 2838 2839
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
2840
    SExprSupp* pSup = &pInfo->base.pseudoSup;
2841 2842
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
2843 2844 2845 2846
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
2847 2848 2849 2850 2851 2852
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
2853 2854
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
2855
  pInfo->base.readHandle = *readHandle;
2856 2857 2858

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
H
Haojun Liao 已提交
2859

2860
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
2861
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
2862 2863 2864 2865 2866 2867

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2868
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
2869
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
2870 2871
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

2872
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
2873

H
Haojun Liao 已提交
2874
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
2875
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
2876
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
2877

dengyihao's avatar
dengyihao 已提交
2878
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
2879 2880
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
2881

L
Liu Jicong 已提交
2882 2883
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
2884
  pOperator->exprSupp.numOfExprs = numOfCols;
2885

2886 2887
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
2888 2889 2890 2891 2892 2893 2894 2895 2896
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
2897 2898 2899 2900

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
2901
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
2902 2903 2904 2905 2906 2907
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
2908 2909
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
2910 2911 2912 2913 2914 2915 2916
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
S
slzhou 已提交
2978 2979
    strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3008

S
slzhou 已提交
3009
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3010 3011 3012
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3013
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3014
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3015
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3016 3017 3018 3019 3020 3021 3022 3023 3024

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3025
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3026 3027
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3028 3029 3030
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3031 3032 3033

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3034 3035
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3047 3048 3049
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3050
    ASSERT(strlen(dbName));
S
slzhou 已提交
3051
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3052 3053 3054 3055

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3056 3057 3058 3059 3060 3061
    varDataSetLen(varDbName, strlen(dbName));
    colDataAppend(colInfoData, 0, varDbName, false);
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3062
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3063
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3064
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3065 3066 3067 3068 3069
      varDataSetLen(varStbName, strlen(stbName));
      colDataAppend(colInfoData, 0, varStbName, false);
    } else {
      colDataAppendNULL(colInfoData, 0);
    }
S
slzhou 已提交
3070 3071 3072
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3073
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
S
slzhou 已提交
3074 3075 3076 3077 3078
    colDataAppend(colInfoData, 0, (char*)&count, false);
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3079
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3080 3081 3082
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3083
  size_t infodbTableNum;
S
slzhou 已提交
3084
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3085
  size_t perfdbTableNum;
S
slzhou 已提交
3086 3087 3088
  getPerfDbMeta(NULL, &perfdbTableNum);

  if (pSupp->groupByDbName) {
S
slzhou 已提交
3089
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3090 3091
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3092
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3093 3094 3095 3096
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
    uint64_t groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
    uint64_t groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3126
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3127 3128 3129 3130
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3131
  blockDataCleanup(pRes);
3132

S
slzhou 已提交
3133 3134 3135
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3136
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3137
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3138
  }
S
slzhou 已提交
3139

S
slzhou 已提交
3140 3141 3142 3143 3144
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3145 3146
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3147
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3148

S
slzhou 已提交
3149 3150 3151 3152 3153 3154
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

S
slzhou 已提交
3155
  if (pSupp->groupByDbName) {
S
slzhou 已提交
3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3170
      }
S
slzhou 已提交
3171 3172 3173 3174 3175 3176 3177 3178 3179 3180
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3181
    } else {
S
slzhou 已提交
3182
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3183 3184
    }
  } else {
S
slzhou 已提交
3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3202 3203 3204
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3205
    }
S
slzhou 已提交
3206 3207 3208
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3209
  }
S
slzhou 已提交
3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
  snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
  fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
  snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3238 3239 3240
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3241
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3242 3243
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3244
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3245 3246
  taosMemoryFreeClear(param);
}