scanoperator.c 132.9 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
// clang-format off

18
#include "executorInt.h"
H
Haojun Liao 已提交
19
#include "filter.h"
20
#include "function.h"
21
#include "functionMgt.h"
L
Liu Jicong 已提交
22
#include "os.h"
H
Haojun Liao 已提交
23
#include "querynodes.h"
24
#include "systable.h"
H
Haojun Liao 已提交
25
#include "tname.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27 28 29 30 31 32 33 34

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
35 36
#include "operator.h"
#include "querytask.h"
H
Haojun Liao 已提交
37

38 39 40
#include "storageapi.h"
#include "wal.h"

D
dapan1121 已提交
41 42
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
43
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
44
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
45
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
L
fix bug  
liuyao 已提交
46
#define STREAM_SCAN_OP_NAME          "StreamScanOperator"
47

H
Haojun Liao 已提交
48 49 50 51 52 53 54 55 56 57
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
58
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
59 60
} STableMergeScanSortSourceParam;

61 62 63 64 65 66 67 68 69 70
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
71
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
72

H
Haojun Liao 已提交
73
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
74 75 76 77 78 79 80 81 82 83 84 85
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

86
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
87 88 89 90 91
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

92
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
93 94 95 96 97 98 99
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

100
  if (order == TSDB_ORDER_ASC) {
101
    w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
102
    ASSERT(w.ekey >= pBlockInfo->window.skey);
103

104
    if (w.ekey < pBlockInfo->window.ekey) {
105 106 107
      return true;
    }

108 109
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
110 111 112 113
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

114
      ASSERT(w.ekey > pBlockInfo->window.ekey);
115
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
116 117 118 119
        return true;
      }
    }
  } else {
120
    w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
121
    ASSERT(w.skey <= pBlockInfo->window.ekey);
122

123
    if (w.skey > pBlockInfo->window.skey) {
124 125 126
      return true;
    }

127
    while (1) {
128 129 130 131 132
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

H
Haojun Liao 已提交
133
      ASSERT(w.skey < pBlockInfo->window.skey);
134
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
135 136 137
        return true;
      }
    }
138 139 140 141 142
  }

  return false;
}

143 144 145 146 147 148 149 150 151 152 153
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
154 155
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
156 157 158 159 160

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
161
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
162 163 164
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
165

166 167 168
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

169 170 171 172 173 174 175 176
static int32_t insertTableToScanIgnoreList(STableScanInfo* pTableScanInfo, uint64_t uid) {
  if (NULL == pTableScanInfo->pIgnoreTables) {
    int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
    pTableScanInfo->pIgnoreTables = taosHashInit(tableNum,  taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (NULL == pTableScanInfo->pIgnoreTables) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
H
Haojun Liao 已提交
177

178 179 180 181 182
  taosHashPut(pTableScanInfo->pIgnoreTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes, sizeof(pTableScanInfo->scanTimes));

  return TSDB_CODE_SUCCESS;
}

183 184
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;
185
  int32_t code = TSDB_CODE_SUCCESS;
186

H
Haojun Liao 已提交
187
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
188 189 190
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
191
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
192 193

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
194
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
195 196 197 198 199 200 201 202 203

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
204
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
205 206 207 208 209 210 211 212 213

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
214
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
215 216 217

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
218
    code = insertTableToScanIgnoreList(pTableScanInfo, pBlockInfo->id.uid);
219 220
  }

221
  return code;
222 223
}

H
Haojun Liao 已提交
224
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
225
                               int32_t numOfRows) {
H
Haojun Liao 已提交
226
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
227 228 229
    return true;
  }

H
Haojun Liao 已提交
230
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
231 232 233
  return keep;
}

H
Haojun Liao 已提交
234
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
235 236
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

H
Haojun Liao 已提交
237
  bool    allColumnsHaveAgg = true;
G
Ganlin Zhao 已提交
238
  bool    hasNullSMA = false;
H
Haojun Liao 已提交
239
  int32_t code = pAPI->tsdReader.tsdReaderRetrieveBlockSMAInfo(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg, &hasNullSMA);
H
Haojun Liao 已提交
240
  if (code != TSDB_CODE_SUCCESS) {
241
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
242 243
  }

G
Ganlin Zhao 已提交
244
  if (!allColumnsHaveAgg || hasNullSMA) {
H
Haojun Liao 已提交
245 246 247 248 249
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
250
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
251
                               int32_t rows) {
H
Haojun Liao 已提交
252 253 254
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

255
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
256
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
257
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
258
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
259 260
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
261 262 263

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
264 265 266
  }
}

267
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
268
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
269
  const char* id = GET_TASKID(pTaskInfo);
270

271
  if (pLimitInfo->remainOffset > 0) {
272 273
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
274
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
275
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
276
      return false;
277
    } else {
278
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
279 280 281 282 283 284
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
285
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
286
    blockDataKeepFirstNRows(pBlock, keep);
287 288

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
289
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
290
    return true;
291
  }
292

293
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
294
  return false;
295 296
}

H
Haojun Liao 已提交
297
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
298
                             uint32_t* status) {
S
slzhou 已提交
299
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
300 301
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

302
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
303 304

  pCost->totalBlocks += 1;
305
  pCost->totalRows += pBlock->info.rows;
306

H
Haojun Liao 已提交
307
  bool loadSMA = false;
H
Haojun Liao 已提交
308
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
309
  if (pOperator->exprSupp.pFilterInfo != NULL ||
310
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
311 312 313 314
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
315
  taosMemoryFreeClear(pBlock->pBlockAgg);
316 317

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
318
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
319
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
320
    pCost->filterOutBlocks += 1;
321
    pCost->totalRows += pBlock->info.rows;
322
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
323 324
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
325 326 327
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
328
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
329
    pCost->skipBlocks += 1;
330
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
331
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
332
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
333
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
334
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
335
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
336
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
337
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
338
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
339
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
340
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
341 342
      return TSDB_CODE_SUCCESS;
    } else {
343
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
344
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
345
    }
H
Haojun Liao 已提交
346
  }
347

H
Haojun Liao 已提交
348
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
349

H
Haojun Liao 已提交
350
  // try to filter data block according to sma info
H
Haojun Liao 已提交
351
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
352 353 354
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
355
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
356
      if (!keep) {
X
Xiaoyu Wang 已提交
357 358
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
359 360 361
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

362
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
363 364
        return TSDB_CODE_SUCCESS;
      }
365
    }
H
Haojun Liao 已提交
366
  }
367

368 369 370
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

371
  // try to filter data block according to current results
372 373
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
374 375
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
376
    pCost->skipBlocks += 1;
377
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
378

379 380
    STableScanInfo* p1 = pOperator->info;
    if (taosHashGetSize(p1->pIgnoreTables) == taosArrayGetSize(p1->base.pTableListInfo->pTableList)) {
381 382 383 384
      *status = FUNC_DATA_REQUIRED_ALL_FILTEROUT;
    } else {
      *status = FUNC_DATA_REQUIRED_FILTEROUT;
    }
385 386 387
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
388 389
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
390

391
  SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
H
Haojun Liao 已提交
392
  if (p == NULL) {
H
Haojun Liao 已提交
393
    return terrno;
H
Haojun Liao 已提交
394 395
  }

H
Haojun Liao 已提交
396
  ASSERT(p == pBlock);
397
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
398

H
Haojun Liao 已提交
399 400
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
401

H
Haojun Liao 已提交
402
  if (pOperator->exprSupp.pFilterInfo != NULL) {
403
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
404
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
405

406 407
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
408

409 410
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
411
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
412 413 414 415
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
416 417
  }

418
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
419
  if (limitReached) {  // set operator flag is done
420 421
    setOperatorCompleted(pOperator);
  }
422

H
Haojun Liao 已提交
423
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
424 425 426
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
427
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
428 429 430
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
431
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
432 433
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
434 435
}

436 437
typedef struct STableCachedVal {
  const char* pName;
438
  STag*       pTags;
439 440
} STableCachedVal;

441 442 443 444 445 446 447 448 449 450 451
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
452 453
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
454
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
455 456 457 458
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
459
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
460 461 462 463 464 465 466
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

467
// const void *key, size_t keyLen, void *value
D
dapan1121 已提交
468 469 470 471 472 473
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value, void* ud) {
  (void)key;
  (void)keyLen;
  (void)ud;
  freeTableCachedVal(value);
}
474

475 476 477 478 479
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
480
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
481 482 483
  }
}

484 485
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
486
  // currently only the tbname pseudo column
487
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
488
    return TSDB_CODE_SUCCESS;
489 490
  }

491 492
  int32_t code = 0;

493 494 495 496
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

497
  bool            freeReader = false;
498
  STableCachedVal val = {0};
499 500

  SMetaReader mr = {0};
501
  LRUHandle*  h = NULL;
502

503 504 505
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

506
  // 1. check if it is existed in meta cache
507
  if (pCache == NULL) {
D
dapan1121 已提交
508
    pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_NOLOCK, &pHandle->api.metaFn);
509
    code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
510
    if (code != TSDB_CODE_SUCCESS) {
511
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
512
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
513 514
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
515 516 517

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
518
      } else {
S
slzhou 已提交
519 520
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
521
      }
522
      pHandle->api.metaReaderFn.clearReader(&mr);
523 524 525
      return terrno;
    }

526
    pHandle->api.metaReaderFn.readerReleaseLock(&mr);
527

528 529
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
530 531

    freeReader = true;
532
  } else {
533 534
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
535
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
536
    if (h == NULL) {
H
Haojun Liao 已提交
537
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
538
      code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
539
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
540
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
541
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
542
                pBlock->info.id.uid, tstrerror(terrno), idStr);
543 544
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
545
        } else {
H
Haojun Liao 已提交
546
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
547
                 idStr);
H
Haojun Liao 已提交
548
        }
549
        pHandle->api.metaReaderFn.clearReader(&mr);
550 551 552
        return terrno;
      }

553
      pHandle->api.metaReaderFn.readerReleaseLock(&mr);
554

H
Haojun Liao 已提交
555
      STableCachedVal* pVal = createTableCacheVal(&mr);
556

H
Haojun Liao 已提交
557
      val = *pVal;
558
      freeReader = true;
H
Haojun Liao 已提交
559

H
Haojun Liao 已提交
560
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
D
dapan1121 已提交
561
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
562 563 564 565 566 567 568 569
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
570

H
Haojun Liao 已提交
571
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
572
    }
H
Haojun Liao 已提交
573

574 575
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
576
  }
577

578 579
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
580
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
581 582

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
583
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
584

585
    int32_t functionId = pExpr1->pExpr->_function.functionId;
586 587 588

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
589
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
590
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
591
      STagVal tagVal = {0};
592
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
593
      const char* p = pHandle->api.metaFn.extractTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
594

595 596 597 598
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
599
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
600
      }
601

H
Haojun Liao 已提交
602 603
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
604
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
605
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
606
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
607 608 609
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
610 611
        if (code) {
          if (freeReader) {
612
            pHandle->api.metaReaderFn.clearReader(&mr);
D
dapan1121 已提交
613 614 615
          }
          return code;
        }
L
Liu Jicong 已提交
616
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
617
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
618
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
619
        }
620 621 622 623
      }
    }
  }

624 625
  // restore the rows
  pBlock->info.rows = backupRows;
626
  if (freeReader) {
627
    pHandle->api.metaReaderFn.clearReader(&mr);
628 629
  }

H
Haojun Liao 已提交
630
  return TSDB_CODE_SUCCESS;
631 632
}

H
Haojun Liao 已提交
633
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
634 635 636
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
637
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
638
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
639 640 641
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
642

H
Haojun Liao 已提交
643
  colInfoDataEnsureCapacity(&infoData, 1, false);
644
  colDataSetVal(&infoData, 0, buf, false);
645

H
Haojun Liao 已提交
646
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
647
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
648 649 650 651 652 653 654

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
655
  colDataDestroy(&infoData);
656 657
}

658
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
659
  STableScanInfo* pTableScanInfo = pOperator->info;
660
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
661 662
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;

L
Liu Jicong 已提交
663
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
664 665
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
666

667 668
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
669
  while (true) {
670
    code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
D
dapan1121 已提交
671
    if (code) {
672
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
673 674 675 676 677 678
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
679

680
    if (isTaskKilled(pTaskInfo)) {
681
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
682
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
683
    }
H
Haojun Liao 已提交
684

685
    if (pOperator->status == OP_EXEC_DONE) {
686
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
687 688 689
      break;
    }

690 691 692 693 694 695
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
696
    if (pBlock->info.id.uid) {
697
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
698
    }
699

700
    uint32_t status = 0;
701
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
702
    if (code != TSDB_CODE_SUCCESS) {
703
      T_LONG_JMP(pTaskInfo->env, code);
704
    }
705

706 707 708 709
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

710 711 712
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
713
    }
714

H
Haojun Liao 已提交
715 716
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
717

H
Haojun Liao 已提交
718
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
719
    pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
720
    return pBlock;
H
Haojun Liao 已提交
721 722 723 724
  }
  return NULL;
}

H
Haojun Liao 已提交
725
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
726 727
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
728
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
H
Haojun Liao 已提交
729 730

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
731
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
732 733 734
    return NULL;
  }

735 736
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
737 738 739
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
740 741
    }

742
    pTableScanInfo->scanTimes += 1;
743
    taosHashClear(pTableScanInfo->pIgnoreTables);
744

745
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
746
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
747 748
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
749
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
750

751
      // do prepare for the next round table scan operation
752
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
753
    }
754
  }
H
Haojun Liao 已提交
755

756
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
757
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
758 759
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
760
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
761
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
762
    }
H
Haojun Liao 已提交
763

764
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
765 766 767
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
768
      }
H
Haojun Liao 已提交
769

770
      pTableScanInfo->scanTimes += 1;
771
      taosHashClear(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
772

773
      if (pTableScanInfo->scanTimes < total) {
774
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
775
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
776

777
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
778
        pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
779
      }
H
Haojun Liao 已提交
780 781 782
    }
  }

wmmhello's avatar
wmmhello 已提交
783 784 785 786 787 788
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
789
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
790

791
  // scan table one by one sequentially
L
Liu Jicong 已提交
792
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
793
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
794
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
795

L
Liu Jicong 已提交
796
    while (1) {
H
Haojun Liao 已提交
797
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
798
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
799 800
        return result;
      }
H
Haojun Liao 已提交
801

L
Liu Jicong 已提交
802 803
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
804 805

      taosRLockLatch(&pTaskInfo->lock);
806
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
807

H
Haojun Liao 已提交
808
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
809
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
810
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
811 812
        return NULL;
      }
H
Haojun Liao 已提交
813

X
Xiaoyu Wang 已提交
814
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
815 816
      taosRUnLockLatch(&pTaskInfo->lock);

817
      pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
818
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
819
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
820

821
      pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
822 823
      pInfo->scanTimes = 0;
    }
824 825
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
826
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
827
        setOperatorCompleted(pOperator);
828 829
        return NULL;
      }
830

5
54liuyao 已提交
831
      int32_t        num = 0;
832
      STableKeyInfo* pList = NULL;
833
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
834
      ASSERT(pInfo->base.dataReader == NULL);
835

836
      int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
837
                                    (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
838 839 840
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
841 842 843 844

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
845
    }
H
Haojun Liao 已提交
846

H
Haojun Liao 已提交
847
    SSDataBlock* result = doGroupedTableScan(pOperator);
848 849 850
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
851

852
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
853
      setOperatorCompleted(pOperator);
854 855
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
856

857 858
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
859
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
860

5
54liuyao 已提交
861
    int32_t        num = 0;
862
    STableKeyInfo* pList = NULL;
863
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
864

865 866
    pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
    pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
867
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
868

H
Haojun Liao 已提交
869
    result = doGroupedTableScan(pOperator);
870 871 872
    if (result != NULL) {
      return result;
    }
873

H
Haojun Liao 已提交
874
    setOperatorCompleted(pOperator);
875 876
    return NULL;
  }
H
Haojun Liao 已提交
877 878
}

879 880
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
881
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
882
  *pRecorder = pTableScanInfo->base.readRecorder;
883 884 885 886 887
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

888
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
889
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
890

891
  pAPI->tsdReaderClose(pBase->dataReader);
892
  pBase->dataReader = NULL;
893

894 895
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
896
  }
L
Liu Jicong 已提交
897

898
  tableListDestroy(pBase->pTableListInfo);
899 900 901 902 903 904 905
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
906
  taosHashCleanup(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
907
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
D
dapan1121 已提交
908
  taosMemoryFreeClear(param);
909 910
}

911
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
912
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
913
  int32_t         code = 0;
H
Haojun Liao 已提交
914 915 916
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
917
    code = TSDB_CODE_OUT_OF_MEMORY;
918
    goto _error;
H
Haojun Liao 已提交
919 920
  }

921
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
922
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
923 924

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
925
  code =
H
Haojun Liao 已提交
926
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
927 928 929 930
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
931
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
932
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
933
  if (code != TSDB_CODE_SUCCESS) {
934
    goto _error;
935 936
  }

H
Haojun Liao 已提交
937
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
938
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
939
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
940
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
941 942
  }

943
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
944
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
945

H
Haojun Liao 已提交
946 947
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
948 949
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

950 951
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
952

H
Haojun Liao 已提交
953
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
954
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
955
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
956
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
957

H
Haojun Liao 已提交
958 959 960
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
961 962
  }

wmmhello's avatar
wmmhello 已提交
963
  pInfo->currentGroupId = -1;
964
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
965
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
966

L
Liu Jicong 已提交
967 968
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
969
  pOperator->exprSupp.numOfExprs = numOfCols;
970

971
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
972 973
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
974 975 976
    code = terrno;
    goto _error;
  }
977

D
dapan1121 已提交
978 979 980 981
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
982
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
983 984
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
985 986 987

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
988
  return pOperator;
989

990
_error:
991 992 993
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
994

995 996
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
997
  return NULL;
H
Haojun Liao 已提交
998 999
}

1000
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1001
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
1002
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1003

H
Haojun Liao 已提交
1004
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
1005
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
1006

L
Liu Jicong 已提交
1007 1008
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1009
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1010 1011 1012
  return pOperator;
}

1013
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1014
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1015 1016
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1017 1018
}

1019
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1020
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1021 1022
}

1023
static bool isStateWindow(SStreamScanInfo* pInfo) {
1024
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1025
}
5
54liuyao 已提交
1026

L
Liu Jicong 已提交
1027
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1028 1029 1030
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1031 1032 1033
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1034
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1035 1036
}

1037 1038 1039 1040
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1041
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1042 1043
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1044
  ASSERT(rowIndex < pBlock->info.rows);
1045
  pInfo->groupId = groupCol[rowIndex];
1046 1047
}

1048
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) {
H
Haojun Liao 已提交
1049
  pTableScanInfo->base.cond.twindows = *pWin;
L
liuyao 已提交
1050
  pTableScanInfo->base.cond.startVersion = 0;
1051
  pTableScanInfo->base.cond.endVersion = ver;
L
Liu Jicong 已提交
1052 1053
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1054
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1055
  pTableScanInfo->base.dataReader = NULL;
1056 1057
}

L
Liu Jicong 已提交
1058 1059
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1060
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1061

1062
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1063
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1064 1065 1066 1067 1068 1069

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
1070
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
1071 1072 1073

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
1074
  int32_t      code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
1075
                                     (void**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
1076 1077
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1078
    T_LONG_JMP(pTaskInfo->env, code);
1079 1080 1081
    return NULL;
  }

D
dapan1121 已提交
1082
  bool hasNext = false;
1083
  code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext);
1084 1085
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1086
    T_LONG_JMP(pTaskInfo->env, code);
1087 1088 1089
    return NULL;
  }

D
dapan1121 已提交
1090
  if (hasNext) {
1091
    /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1092
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1093
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1094 1095
  }

1096
  pAPI->tsdReader.tsdReaderClose(pReader);
D
dapan1121 已提交
1097
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1098 1099
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1100 1101

  return pBlock->info.rows > 0 ? pBlock : NULL;
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1113
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1114
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1115
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1116 1117
}

5
54liuyao 已提交
1118 1119 1120 1121 1122 1123 1124 1125
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1126
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1127 1128 1129
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1140 1141 1142
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1143 1144 1145 1146 1147 1148

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1149
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1150 1151 1152 1153
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1154 1155 1156
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1157
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1158 1159 1160
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1161

1162
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1163 1164 1165
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1166

1167 1168
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1169 1170 1171
    break;
  }

1172
  STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
L
liuyao 已提交
1173
  qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, pInfo->pUpdateInfo->maxDataVersion);
1174
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1175
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1176 1177 1178
  return true;
}

5
54liuyao 已提交
1179
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1180
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1181
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1182
  dumyInfo.cur.pageId = -1;
1183
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1184 1185
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1186
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1187

5
54liuyao 已提交
1188
  while (1) {
1189 1190 1191
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1192
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1193 1194 1195 1196 1197
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1198
    }
5
54liuyao 已提交
1199

5
54liuyao 已提交
1200 1201 1202
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1203
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1204
    endWin = preWin;
5
54liuyao 已提交
1205
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1206 1207 1208 1209 1210 1211
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1212

L
Liu Jicong 已提交
1213
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1214
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1215
  bool prepareRes = true;
L
Liu Jicong 已提交
1216 1217 1218
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1219 1220
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1221 1222 1223 1224
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1225 1226 1227
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1228 1229
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1230
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1231
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1232
      pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1233
      pTableScanInfo->base.dataReader = NULL;
1234 1235
      return NULL;
    }
L
Liu Jicong 已提交
1236

H
Haojun Liao 已提交
1237
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1238 1239 1240 1241
    if (pResult->info.rows == 0) {
      continue;
    }

1242 1243 1244 1245 1246 1247 1248 1249
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1250 1251
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1252
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1253 1254 1255 1256
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1257 1258 1259

      blockDataDestroy(tmpBlock);

1260 1261 1262 1263
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1264
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1265
      pResult->info.calWin = pInfo->updateWin;
1266
      return pResult;
5
54liuyao 已提交
1267 1268
    }
  }
1269
}
1270

1271
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1272
                                   SSessionKey* pKey) {
1273 1274 1275
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1276

1277 1278
  void* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
1279 1280 1281
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1282 1283

  taosMemoryFree(pCur);
1284 1285 1286
  return code;
}

1287
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1288
  blockDataCleanup(pDestBlock);
1289 1290
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1291
  }
1292
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1293
  if (code != TSDB_CODE_SUCCESS) {
1294
    return code;
L
Liu Jicong 已提交
1295
  }
1296 1297
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1298
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1299
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1300
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1301 1302
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1303

1304 1305
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1306
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1307
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1308 1309
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
H
Haojun Liao 已提交
1310
  int64_t          ver = pSrcBlock->info.version - 1;
1311
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
H
Haojun Liao 已提交
1312
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver);
L
Liu Jicong 已提交
1313
    // gap must be 0.
5
54liuyao 已提交
1314
    SSessionKey startWin = {0};
1315
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1316
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1317 1318 1319
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1320 1321
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1322
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1323 1324 1325 1326
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1327
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1328 1329
      continue;
    }
1330 1331
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1332

1333
    colDataSetNULL(pDestUidCol, i);
1334
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1335 1336
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1337
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1338
  }
1339
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1340
}
1341 1342 1343 1344 1345 1346

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1347
  }
1348

1349 1350
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1351 1352
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1353

L
Liu Jicong 已提交
1354
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1355
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1356 1357
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
H
Haojun Liao 已提交
1358
  int64_t ver = pSrcBlock->info.version - 1;
5
54liuyao 已提交
1359 1360 1361 1362 1363

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
H
Haojun Liao 已提交
1364
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
5
54liuyao 已提交
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1392 1393
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1394
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1395 1396 1397
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1398
  for (int32_t i = 0; i < rows;) {
1399
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1400 1401
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
H
Haojun Liao 已提交
1402
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
5
54liuyao 已提交
1403 1404
    }
    TSKEY calStartTs = srcStartTsCol[i];
1405
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1406
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1407 1408
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1409 1410 1411 1412 1413
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1414
    pDestBlock->info.rows++;
5
54liuyao 已提交
1415
  }
1416 1417
  return TSDB_CODE_SUCCESS;
}
1418

1419
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1420 1421 1422
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1423 1424
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1425
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1426 1427 1428 1429
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1430 1431 1432 1433 1434 1435 1436 1437 1438
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
H
Haojun Liao 已提交
1439
  int64_t ver = pSrcBlock->info.version - 1;
1440
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1441 1442
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1443
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1444
    if (groupId == 0) {
H
Haojun Liao 已提交
1445
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
5
54liuyao 已提交
1446
    }
L
Liu Jicong 已提交
1447
    if (pInfo->tbnameCalSup.pExprInfo) {
1448
      void* parTbname = NULL;
1449
      pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
1450

L
Liu Jicong 已提交
1451 1452
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
1453
      pInfo->stateStore.streamStateFreeVal(parTbname);
L
Liu Jicong 已提交
1454 1455 1456
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1457 1458 1459 1460
  }
  return TSDB_CODE_SUCCESS;
}

1461 1462 1463 1464
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1465
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1466
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1467 1468
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1469
  }
1470
  pDestBlock->info.type = STREAM_CLEAR;
1471
  pDestBlock->info.version = pSrcBlock->info.version;
1472
  pDestBlock->info.dataLoad = 1;
1473 1474 1475 1476
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1477
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1478
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1479 1480
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1481
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1482
  } else {
5
54liuyao 已提交
1483
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
1484
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
L
Liu Jicong 已提交
1485
  }
L
Liu Jicong 已提交
1486 1487
}

1488 1489
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1490 1491
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1492 1493
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1494 1495
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1496
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1497 1498 1499 1500 1501 1502 1503
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1504
  pBlock->info.rows++;
5
54liuyao 已提交
1505 1506
}

1507
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1508 1509
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1510
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1511
  }
1512 1513
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1514
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
1515
  bool   tableInserted = pInfo->stateStore.updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1516
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1517 1518
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1519
    bool        isClosed = false;
5
54liuyao 已提交
1520
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1521
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1522 1523 1524 1525 1526
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1527 1528 1529
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1530
    // must check update info first.
1531
    bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1532
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1533
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore);
L
Liu Jicong 已提交
1534
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1535
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1536
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1537
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1538
                                       NULL);
5
54liuyao 已提交
1539 1540
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1541 1542
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1543
      }
1544 1545
    }
  }
1546 1547
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1548
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1549
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1550
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1551 1552
  }
}
L
Liu Jicong 已提交
1553

1554
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1555 1556
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1557
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1558

1559 1560
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1561
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1562
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1563
  pInfo->pRes->info.type = STREAM_NORMAL;
1564
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1565

1566
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1567
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1568 1569

  // todo extract method
H
Haojun Liao 已提交
1570 1571 1572
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1573 1574 1575 1576 1577 1578 1579
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1580
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1581
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1582 1583 1584 1585 1586 1587 1588
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1589
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1590
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1591 1592 1593 1594 1595
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1596
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
L
liuyao 已提交
1597
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
K
kailixu 已提交
1598 1599
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1600
      blockDataFreeRes((SSDataBlock*)pBlock);
1601
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1602
    }
K
kailixu 已提交
1603 1604 1605

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1606 1607
  }

1608
  if (filter) {
H
Haojun Liao 已提交
1609
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1610
  }
1611

1612
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1613
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
1614
//  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1615

L
Liu Jicong 已提交
1616
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1617 1618
  return 0;
}
5
54liuyao 已提交
1619

L
Liu Jicong 已提交
1620
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1621 1622 1623
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

1624
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1625
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1626

1627
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1628

1629
  if (isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
1630
    return NULL;
L
Liu Jicong 已提交
1631 1632
  }

1633
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1634 1635
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
1636 1637 1638
//      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
//             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
//             pInfo->tqReader->pWalReader->curVersion);
1639
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1640
      return pResult;
1641
    }
1642

1643
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1644
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1645

1646 1647
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1648
    if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1649
      return NULL;
1650
    }
1651

wmmhello's avatar
wmmhello 已提交
1652
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1653 1654
  }

1655
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
1656

L
Liu Jicong 已提交
1657
    while (1) {
1658 1659
      bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);

H
Haojun Liao 已提交
1660
      SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
1661
      struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
1662 1663

      // curVersion move to next, so currentOffset = curVersion - 1
1664
      tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1);
1665

1666
      if (hasResult) {
1667
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
X
Xiaoyu Wang 已提交
1668
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1669
        blockDataCleanup(pInfo->pRes);
1670
        setBlockIntoRes(pInfo, pRes, true);
L
Liu Jicong 已提交
1671 1672 1673
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
1674
      } else {
wmmhello's avatar
wmmhello 已提交
1675
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1676 1677 1678
        return NULL;
      }
    }
L
Liu Jicong 已提交
1679
  } else {
1680
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1681
    return NULL;
H
Haojun Liao 已提交
1682
  }
L
Liu Jicong 已提交
1683 1684
}

L
Liu Jicong 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
1700 1701

  int32_t j = 0;
L
Liu Jicong 已提交
1702
  for (int32_t i = 0; i < rows; i++) {
1703
    if (pInfo->readerFn.tqReaderIsQueriedTable(pReader, uidCol[i])) {
1704 1705 1706
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1707

1708 1709 1710
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1711 1712 1713
      j++;
    }
  }
1714

L
Liu Jicong 已提交
1715
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1716 1717
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1718
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1719 1720 1721 1722

  return 0;
}

5
54liuyao 已提交
1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1735
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1736 1737 1738 1739
    }
  }
}

5
54liuyao 已提交
1740
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
L
liuyao 已提交
1741
  if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
1742
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1743
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1755 1756 1757 1758
    }
  }
}

1759 1760 1761 1762 1763 1764
//int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
//  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
//  *pBuff = taosMemoryCalloc(1, len);
//  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
//  return len;
//}
L
liuyao 已提交
1765 1766

// other properties are recovered from the execution plan
1767
void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
L
fix bug  
liuyao 已提交
1768
  if (!pBuff || len == 0) {
L
liuyao 已提交
1769 1770 1771
    return;
  }

1772 1773
  void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
  int32_t      code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1774 1775 1776 1777 1778
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1779 1780
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
1781 1782
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  const char*    id = GET_TASKID(pTaskInfo);
1783

1784
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
L
Liu Jicong 已提交
1785 1786
  SStreamScanInfo* pInfo = pOperator->info;

1787
  qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1788

1789 1790
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1791
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1792
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1793
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1794 1795
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
1796
      qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
H
Haojun Liao 已提交
1797
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1798
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1799
    } else {
H
Haojun Liao 已提交
1800 1801
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
1802
      qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
H
Haojun Liao 已提交
1803
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1804
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1805
    }
L
Liu Jicong 已提交
1806

1807
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1808

H
Haojun Liao 已提交
1809
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1810
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1811

L
Liu Jicong 已提交
1812 1813
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1814
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1815 1816
  }

5
54liuyao 已提交
1817 1818
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1819 1820 1821 1822 1823
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1824 1825 1826 1827 1828 1829 1830

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1831 1832 1833 1834
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1835
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1836 1837
        return pInfo->pUpdateRes;
      } break;
1838 1839 1840 1841 1842 1843 1844 1845 1846
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1847 1848 1849 1850 1851 1852
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1853
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1854 1855 1856 1857 1858 1859
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1860 1861 1862 1863 1864 1865
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1866
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1867
      calBlockTbName(pInfo, pInfo->pRecoverRes);
L
liuyao 已提交
1868
      if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
5
54liuyao 已提交
1869
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
1870
          TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1871 1872
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
L
liuyao 已提交
1873
          pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer2);
5
54liuyao 已提交
1874 1875
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1876
      }
5
54liuyao 已提交
1877 1878
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1879
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1880 1881
        return pInfo->pCreateTbRes;
      }
1882

X
Xiaoyu Wang 已提交
1883
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1884 1885
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1886 1887
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1888
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1889
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1890

H
Haojun Liao 已提交
1891
    pTSInfo->base.dataReader = NULL;
1892

H
Haojun Liao 已提交
1893 1894
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1895

L
Liu Jicong 已提交
1896
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1897 1898 1899
    return NULL;
  }

5
54liuyao 已提交
1900
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1901
// TODO: refactor
L
Liu Jicong 已提交
1902
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1903
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1904
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1905
      doClearBufferedBlocks(pInfo);
H
Haojun Liao 已提交
1906 1907 1908
      return NULL;
    }

1909 1910 1911
    int32_t  current = pInfo->validBlockIndex++;
    qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);

L
Liu Jicong 已提交
1912 1913
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1914
    if (pBlock->info.parTbName[0]) {
1915
      pAPI->stateStore.streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1916
    }
1917

1918
    // TODO move into scan
5
54liuyao 已提交
1919 1920
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1921
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
1922
    if (pInfo->pUpdateInfo) {
1923
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
L
fix bug  
liuyao 已提交
1924
    }
1925
    blockDataUpdateTsWindow(pBlock, 0);
1926
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1927 1928 1929
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1930 1931 1932
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1933
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
1934
        pInfo->updateResIndex = 0;
1935
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1936
        pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
1937 1938
      } break;
      case STREAM_DELETE_DATA: {
1939
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1940
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1941
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1942
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1943
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1944 1945
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1946
        }
5
54liuyao 已提交
1947 1948
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1949 1950 1951 1952 1953 1954
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1955
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1956
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1957
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1958
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1959 1960
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1961 1962 1963 1964 1965
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1966 1967 1968
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1969
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1970 1971 1972
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1973 1974 1975 1976
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1977
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1978
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1979 1980 1981 1982
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1983
        }
1984 1985 1986
      } break;
      default:
        break;
5
54liuyao 已提交
1987
    }
1988
    // printDataBlock(pBlock, "stream scan recv");
1989
    return pBlock;
L
Liu Jicong 已提交
1990
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
1991
    qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
5
54liuyao 已提交
1992 1993 1994
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1995
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1996 1997 1998
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1999 2000 2001
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2002
      } break;
2003
      case STREAM_SCAN_FROM_DELETE_DATA: {
2004 2005 2006 2007 2008 2009 2010
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2021
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2022 2023
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2024
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2025
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2026 2027
          return pSDB;
        }
2028
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2029 2030 2031 2032
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2033
    }
2034

2035
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2036
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2037 2038
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2039 2040
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2041
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2042
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2043
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2044
    }
5
54liuyao 已提交
2045

2046 2047
    SSDataBlock*    pBlock = pInfo->pRes;
    SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
2048
    int32_t         totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
2049

L
Liu Jicong 已提交
2050
  NEXT_SUBMIT_BLK:
2051
    while (1) {
2052
      if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
2053
        if (pInfo->validBlockIndex >= totalBlocks) {
2054
          pAPI->stateStore.updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
          doClearBufferedBlocks(pInfo);

          qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
          return NULL;
        }

        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);

        qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
2065
        if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2066 2067 2068 2069
          qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
          continue;
        }
      }
2070

2071
      blockDataCleanup(pBlock);
2072

2073 2074
      while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
        SSDataBlock* pRes = NULL;
2075

2076
        int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
2077 2078
        qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
               id);
H
Haojun Liao 已提交
2079

2080
        if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
2081
          qDebug("retrieve data failed, try next block in submit block, %s", id);
2082 2083 2084
          continue;
        }

2085
        setBlockIntoRes(pInfo, pRes, false);
2086

5
54liuyao 已提交
2087 2088
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
2089
          qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
5
54liuyao 已提交
2090
          return pInfo->pCreateTbRes;
2091 2092
        }

2093 2094 2095 2096
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
        doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
        pBlock->info.dataLoad = 1;
        blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
2097

2098 2099
        qDebug("%" PRId64 " rows in datablock, update res:%" PRId64 " %s", pBlockInfo->rows,
               pInfo->pUpdateDataRes->info.rows, id);
2100 2101 2102
        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
          break;
        }
2103
      }
H
Haojun Liao 已提交
2104

2105
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2106
        break;
2107 2108
      } else {
        continue;
5
54liuyao 已提交
2109
      }
H
Haojun Liao 已提交
2110 2111 2112 2113
    }

    // record the scan action.
    pInfo->numOfExec++;
2114
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
2115

2116
    qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
L
Liu Jicong 已提交
2117
    if (pBlockInfo->rows > 0) {
2118
      return pBlock;
L
Liu Jicong 已提交
2119
    }
2120 2121 2122 2123 2124 2125

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
H
Haojun Liao 已提交
2126
  }
2127

2128
  return NULL;
H
Haojun Liao 已提交
2129 2130
}

H
Haojun Liao 已提交
2131
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2132 2133 2134
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2135 2136 2137
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2138 2139 2140 2141 2142 2143
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2144
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2145
  // NOTE: this operator does never check if current status is done or not
2146 2147 2148
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

2149
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2150
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2151
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2152
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2153

wmmhello's avatar
wmmhello 已提交
2154
  qDebug("tmqsnap doRawScan called");
2155
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2156 2157
    bool hasNext = false;
    if (pInfo->dataReader) {
2158
      code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
D
dapan1121 已提交
2159
      if (code) {
2160
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2161
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2162 2163
      }
    }
X
Xiaoyu Wang 已提交
2164

D
dapan1121 已提交
2165
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2166
      if (isTaskKilled(pTaskInfo)) {
2167
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2168
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2169
      }
2170

2171
      SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL);
H
Haojun Liao 已提交
2172
      if (pBlock == NULL) {
2173
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2174 2175
      }

H
Haojun Liao 已提交
2176
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2177
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2178 2179
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2180

H
Haojun Liao 已提交
2181
    SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2182
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2183
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2184
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2185
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2186
    } else {
2187
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2188
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2189
    }
2190
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2191
    tDeleteSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2192
    return NULL;
2193
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2194 2195 2196 2197 2198
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
H
Haojun Liao 已提交
2199 2200
    if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
      qError("tmqsnap getTableInfoFromSnapshot error");
wmmhello's avatar
wmmhello 已提交
2201
      taosMemoryFreeClear(data);
2202 2203 2204
      return NULL;
    }

2205
    if (!sContext->queryMeta) {  // change to get data next poll request
2206 2207 2208
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2209
    } else {
2210
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2211 2212 2213 2214
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2215

wmmhello's avatar
wmmhello 已提交
2216
    return NULL;
2217 2218 2219 2220
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2221
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2222
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
2223
  pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader);
2224
  pRawScan->pAPI->snapshotFn.destroySnapshot(pRawScan->sContext);
2225
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2226 2227 2228
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2229 2230 2231
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2232
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2233 2234 2235 2236 2237
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2238 2239
  int32_t code = TSDB_CODE_SUCCESS;

2240
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2241
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2242
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2243 2244
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2245 2246
  }

2247
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2248
  pInfo->vnode = pHandle->vnode;
H
Haojun Liao 已提交
2249
  pInfo->pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
2250

2251
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2252 2253
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2254

2255
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2256
  return pOperator;
H
Haojun Liao 已提交
2257

L
Liu Jicong 已提交
2258
_end:
H
Haojun Liao 已提交
2259 2260 2261 2262
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2263 2264
}

2265
static void destroyStreamScanOperatorInfo(void* param) {
2266
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2267

2268
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
2269
    destroyOperator(pStreamScan->pTableScanOp);
2270
  }
2271

2272
  if (pStreamScan->tqReader) {
2273
    pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
2274
  }
H
Haojun Liao 已提交
2275 2276
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2277
  }
C
Cary Xu 已提交
2278 2279
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2280
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2281
  }
C
Cary Xu 已提交
2282

L
Liu Jicong 已提交
2283
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2284
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2285

2286
  pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
2287 2288 2289 2290
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2291
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2292
  blockDataDestroy(pStreamScan->pCreateTbRes);
2293 2294 2295 2296
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2297
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2298
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2299
  SArray*          pColIds = NULL;
2300 2301
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2302 2303
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
  const char* idstr = pTaskInfo->id.str;
2304

H
Haojun Liao 已提交
2305
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2306
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2307
    tableListDestroy(pTableListInfo);
2308
    goto _error;
H
Haojun Liao 已提交
2309 2310
  }

2311
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2312
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2313

2314
  pInfo->pTagCond = pTagCond;
2315
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2316

2317
  int32_t numOfCols = 0;
2318 2319
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2320
  if (code != TSDB_CODE_SUCCESS) {
2321
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2322 2323
    goto _error;
  }
2324

H
Haojun Liao 已提交
2325
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2326
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2327
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2328
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2329 2330

    int16_t colId = id->colId;
2331
    taosArrayPush(pColIds, &colId);
2332
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2333
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2334
    }
H
Haojun Liao 已提交
2335 2336
  }

L
Liu Jicong 已提交
2337 2338 2339 2340
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2341
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2342 2343
      goto _error;
    }
2344

L
Liu Jicong 已提交
2345 2346
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
2347
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) {
2348
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2349 2350 2351 2352
      goto _error;
    }
  }

2353 2354
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2355
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2356 2357
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2358
      tableListDestroy(pTableListInfo);
2359 2360
      goto _error;
    }
2361
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
2362
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2363
      tableListDestroy(pTableListInfo);
2364 2365 2366 2367
      goto _error;
    }
  }

L
Liu Jicong 已提交
2368
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2369
  if (pInfo->pBlockLists == NULL) {
2370
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2371
    tableListDestroy(pTableListInfo);
2372
    goto _error;
H
Haojun Liao 已提交
2373 2374
  }

5
54liuyao 已提交
2375
  if (pHandle->vnode) {
2376
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
2377 2378 2379 2380
    if (pTableScanOp == NULL) {
      qError("createTableScanOperatorInfo error, errorcode: %d", pTaskInfo->code);
      goto _error;
    }
L
Liu Jicong 已提交
2381
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2382
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2383
      pTSInfo->base.cond.endVersion = pHandle->version;
2384
    }
L
Liu Jicong 已提交
2385

2386
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2387
    int32_t        num = 0;
2388
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2389

2390
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2391
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2392
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2393 2394
    }

L
Liu Jicong 已提交
2395 2396
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
2397
      pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
L
Liu Jicong 已提交
2398
      ASSERT(pInfo->tqReader);
2399
    } else {
L
Liu Jicong 已提交
2400 2401
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2402 2403
    }

2404
    pInfo->pUpdateInfo = NULL;
2405
    pInfo->pTableScanOp = pTableScanOp;
2406
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
2407
      pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
2408
    }
L
Liu Jicong 已提交
2409

L
Liu Jicong 已提交
2410
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2411
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2412 2413
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2414

L
Liu Jicong 已提交
2415
    // set the extract column id to streamHandle
2416
    pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
2417
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2418
    code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
L
Liu Jicong 已提交
2419 2420 2421 2422
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2423

L
Liu Jicong 已提交
2424
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2425
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2426 2427
  } else {
    taosArrayDestroy(pColIds);
2428
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2429
    pColIds = NULL;
5
54liuyao 已提交
2430 2431
  }

2432 2433 2434 2435 2436
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2437 2438 2439 2440 2441
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2442
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2443
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2444
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2445
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2446
  pInfo->groupId = 0;
2447
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2448
  pInfo->pStreamScanOp = pOperator;
2449
  pInfo->deleteDataIndex = 0;
2450
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2451
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2452
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2453
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2454
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2455 2456
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2457
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2458
  pInfo->pState = NULL;
2459 2460
  pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
  pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
L
Liu Jicong 已提交
2461

L
fix bug  
liuyao 已提交
2462 2463 2464 2465
  // for stream
  if (pTaskInfo->streamInfo.pState) {
    void*   buff = NULL;
    int32_t len = 0;
2466 2467
    pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
    streamScanOperatorDecode(buff, len, pInfo);
dengyihao's avatar
dengyihao 已提交
2468
    taosMemoryFree(buff);
L
fix bug  
liuyao 已提交
2469
  }
L
liuyao 已提交
2470

L
fix bug  
liuyao 已提交
2471
  setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
L
Liu Jicong 已提交
2472
                  pTaskInfo);
2473
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2474

2475
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2476 2477
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2478

H
Haojun Liao 已提交
2479
  return pOperator;
2480

L
Liu Jicong 已提交
2481
_error:
H
Haojun Liao 已提交
2482 2483 2484 2485 2486 2487 2488 2489
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2490 2491
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2492 2493
}

2494
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, SStorageAPI* pAPI) {
2495 2496 2497 2498
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];

2499
  STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
2500
  int32_t        code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
2501 2502 2503 2504
  tDecoderClear(&(*mr).coder);
  if (code != TSDB_CODE_SUCCESS) {
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
           GET_TASKID(pTaskInfo));
2505
    pAPI->metaReaderFn.clearReader(mr);
2506 2507 2508
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

2509
  char str[512];
2510 2511 2512 2513 2514 2515
  for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

    // refactor later
    if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
      STR_TO_VARSTR(str, (*mr).me.name);
2516
      colDataSetVal(pDst, (count), str, false);
2517 2518 2519
    } else {  // it is a tag value
      STagVal val = {0};
      val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2520
      const char* p = pAPI->metaFn.extractTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val);
2521 2522 2523 2524 2525 2526 2527

      char* data = NULL;
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
        data = (char*)p;
      }
2528
      colDataSetVal(pDst, (count), data,
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538
                    (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));

      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
          data != NULL) {
        taosMemoryFree(data);
      }
    }
  }
}

2539
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2540 2541 2542 2543
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2544
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2545
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2546 2547

  STagScanInfo* pInfo = pOperator->info;
2548
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2549
  SSDataBlock*  pRes = pInfo->pRes;
2550
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2551

2552
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2553
  if (size == 0) {
H
Haojun Liao 已提交
2554 2555 2556 2557
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2558 2559 2560
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
H
Haojun Liao 已提交
2561
  pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
H
Haojun Liao 已提交
2562

wmmhello's avatar
wmmhello 已提交
2563
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2564
    doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
2565
    ++count;
wmmhello's avatar
wmmhello 已提交
2566
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2567
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2568
    }
2569
    // each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
2570
    if (pInfo->pSlimit != NULL) {
2571 2572 2573
      if (pInfo->curPos < pInfo->pSlimit->offset) {
        continue;
      }
2574
      pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
2575 2576 2577
      if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
        setOperatorCompleted(pOperator);
      }
2578
      break;
H
Haojun Liao 已提交
2579 2580 2581
    }
  }

2582
  pAPI->metaReaderFn.clearReader(&mr);
2583

2584
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2585
  if (pOperator->status == OP_EXEC_DONE) {
2586
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2587 2588 2589
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2590
  pOperator->resultInfo.totalRows += count;
2591

2592
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2593 2594
}

2595
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2596 2597
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2598
  taosArrayDestroy(pInfo->matchInfo.pList);
2599
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2600
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2601 2602
}

S
slzhou 已提交
2603
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2604
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2605
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2606 2607 2608 2609 2610
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2611 2612 2613 2614
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2615
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
2616 2617 2618
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2619

H
Haojun Liao 已提交
2620 2621
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2622 2623 2624
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2625

2626
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2627
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2628 2629
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2630
  pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
2631

L
Liu Jicong 已提交
2632 2633
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2634
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2635 2636
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2637 2638
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2639 2640

  return pOperator;
2641

2642
_error:
H
Haojun Liao 已提交
2643 2644 2645 2646 2647
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2648

dengyihao's avatar
dengyihao 已提交
2649
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2650 2651 2652 2653
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2654 2655
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

dengyihao's avatar
opt mem  
dengyihao 已提交
2656 2657
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2658
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2659

H
Haojun Liao 已提交
2660
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2661

L
Liu Jicong 已提交
2662
  int64_t      st = taosGetTimestampUs();
2663
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2664
  SReadHandle* pHandle = &pInfo->base.readHandle;
D
dapan1121 已提交
2665
  if (NULL == source->dataReader) {
2666
    code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
D
dapan1121 已提交
2667 2668 2669
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2670
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2671

D
dapan1121 已提交
2672
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2673
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2674
  bool         hasNext = false;
2675
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2676 2677

  while (true) {
2678
    code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
D
dapan1121 已提交
2679
    if (code != 0) {
2680
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
D
dapan1121 已提交
2681 2682 2683 2684 2685 2686 2687
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2688

H
Haojun Liao 已提交
2689
    if (isTaskKilled(pTaskInfo)) {
2690
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
D
dapan1121 已提交
2691
      pInfo->base.dataReader = NULL;
2692
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2693 2694 2695
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2696
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2697 2698 2699 2700
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2701
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2702 2703 2704 2705
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2706 2707

    uint32_t status = 0;
2708
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2709
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2710
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2711
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2712 2713
    }

2714 2715 2716 2717
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

dengyihao's avatar
opt mem  
dengyihao 已提交
2718 2719 2720 2721 2722
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2723
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2724

H
Haojun Liao 已提交
2725
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2726
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2727

2728
    qTrace("tsdb/read-table-data: %p, close reader", reader);
H
Haojun Liao 已提交
2729
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2730 2731
    return pBlock;
  }
H
Haojun Liao 已提交
2732

D
dapan1121 已提交
2733 2734
  pAPI->tsdReader.tsdReaderClose(source->dataReader);
  source->dataReader = NULL;
H
Haojun Liao 已提交
2735
  pInfo->base.dataReader = NULL;
D
dapan1121 已提交
2736 2737
  blockDataDestroy(source->inputBlock);
  source->inputBlock = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2738 2739 2740
  return NULL;
}

2741 2742 2743
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2744
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2745
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2746
      tsTargetSlotId = colInfo->dstSlotId;
2747 2748 2749
    }
  }

2750 2751 2752
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2753
  bi.slotId = tsTargetSlotId;
2754 2755 2756 2757 2758 2759 2760
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2761
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2762 2763 2764 2765 2766 2767 2768
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2769

2770
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2771 2772 2773
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2774
  {
2775
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2776
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2777
    for (; i < numOfTables; ++i) {
2778
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2779 2780 2781 2782 2783 2784
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2785

S
slzhou 已提交
2786 2787
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2788

H
Haojun Liao 已提交
2789
  pInfo->base.dataReader = NULL;
2790

2791 2792
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
D
dapan1121 已提交
2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805
  // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
  int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
  if (kWay >= 128) {
    kWay = 128;
  } else if (kWay <= 2) {
    kWay = 2;
  } else {
    int i = 2; 
    while (i * 2 <= kWay) i = i * 2;
    kWay = i;
  }

  pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
2806
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2807
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
2808
                                             pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
2809

dengyihao's avatar
dengyihao 已提交
2810
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2811 2812 2813 2814 2815 2816

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2817 2818 2819 2820
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2821

2822
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2823 2824

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2825
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2826
    taosArrayPush(pInfo->queryConds, &cond);
2827 2828
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2829
  for (int32_t i = 0; i < numOfTable; ++i) {
2830
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2831
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2832
    ps->param = param;
2833
    ps->onlyRef = true;
2834 2835 2836 2837 2838 2839
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2840
    T_LONG_JMP(pTaskInfo->env, terrno);
2841 2842
  }

2843 2844 2845 2846 2847 2848
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
2849
  SStorageAPI*         pAPI = &pTaskInfo->storageAPI;
2850

dengyihao's avatar
dengyihao 已提交
2851
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2852

2853 2854 2855 2856 2857 2858 2859
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2860
  for (int32_t i = 0; i < numOfTable; ++i) {
2861 2862
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2863
    pAPI->tsdReader.tsdReaderClose(param->dataReader);
D
dapan1121 已提交
2864
    param->dataReader = NULL;
2865
  }
2866 2867
  taosArrayClear(pInfo->sortSourceParams);

2868
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2869
  pInfo->pSortHandle = NULL;
2870

dengyihao's avatar
opt mem  
dengyihao 已提交
2871 2872 2873
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2874
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2875 2876 2877
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2878
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2879 2880 2881
  return TSDB_CODE_SUCCESS;
}

2882 2883
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2884 2885
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2886 2887 2888
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2889
  blockDataCleanup(pResBlock);
2890 2891

  while (1) {
2892
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2893 2894 2895 2896
    if (pTupleHandle == NULL) {
      break;
    }

2897 2898
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2899 2900 2901 2902
      break;
    }
  }

D
dapan1121 已提交
2903 2904 2905 2906 2907
  if (tsortIsClosed(pHandle)) {
    terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
    T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
  }

2908
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2909
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2910
         pInfo->limitInfo.numOfOutputRows);
2911

2912
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2925
    T_LONG_JMP(pTaskInfo->env, code);
2926
  }
2927

2928
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2929 2930
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2931

S
slzhou 已提交
2932
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2933
      setOperatorCompleted(pOperator);
2934 2935
      return NULL;
    }
S
slzhou 已提交
2936
    pInfo->tableStartIndex = 0;
2937
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2938 2939
    startGroupTableMergeScan(pOperator);
  }
2940

S
slzhou 已提交
2941 2942
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2943 2944 2945 2946
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2947 2948
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2949
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2950
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2951 2952 2953
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2954
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2955 2956
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2957
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2958 2959
        break;
      }
2960

S
slzhou 已提交
2961
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2962
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2963
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2964
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2965
    }
wmmhello's avatar
wmmhello 已提交
2966 2967
  }

2968 2969 2970
  return pBlock;
}

2971
void destroyTableMergeScanOperatorInfo(void* param) {
2972
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2973
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2974

dengyihao's avatar
dengyihao 已提交
2975 2976 2977
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2978 2979
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2980
    pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
D
dapan1121 已提交
2981
    p->dataReader = NULL;
2982
  }
H
Haojun Liao 已提交
2983

2984
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
2985 2986
  pTableScanInfo->base.dataReader = NULL;

2987
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2988 2989
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2990

dengyihao's avatar
opt mem  
dengyihao 已提交
2991 2992 2993
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2994 2995
  }

2996
  taosArrayDestroy(pTableScanInfo->queryConds);
2997
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
2998 2999 3000 3001 3002

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
3003
  taosMemoryFreeClear(param);
3004 3005 3006 3007
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3008 3009
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3010
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
3011
  execInfo->blockRecorder = pInfo->base.readRecorder;
3012
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3013 3014 3015

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3016

3017 3018 3019
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3020
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3021
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3022 3023 3024 3025 3026
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3027

3028 3029 3030
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3031
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3032
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3033 3034 3035
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3036

H
Haojun Liao 已提交
3037
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3038
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3039
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3040 3041 3042 3043
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3044
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3045
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
3046
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
3047 3048 3049 3050
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3051 3052 3053 3054 3055 3056
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3057
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
3058 3059
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3060
  pInfo->base.readHandle = *readHandle;
3061 3062 3063

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3064
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3065

3066
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3067
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3068 3069 3070 3071 3072 3073

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3074
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3075
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3076 3077
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3078
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3079

H
Haojun Liao 已提交
3080
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3081
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3082
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3083

dengyihao's avatar
dengyihao 已提交
3084
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3085 3086
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3087

L
Liu Jicong 已提交
3088 3089
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3090
  pOperator->exprSupp.numOfExprs = numOfCols;
3091

3092 3093
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3094 3095 3096 3097 3098 3099 3100 3101 3102
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3103 3104 3105 3106

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3107
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3108
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3109
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
S
slzhou 已提交
3110
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3111
                                                   SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
S
slzhou 已提交
3112 3113
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3114 3115
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3116 3117 3118 3119 3120 3121 3122
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3184 3185
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
H
Haojun Liao 已提交
3198

S
slzhou 已提交
3199 3200 3201 3202 3203 3204 3205 3206 3207
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
H
Haojun Liao 已提交
3208

S
slzhou 已提交
3209 3210 3211 3212 3213 3214 3215
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3216

S
slzhou 已提交
3217
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3218 3219 3220
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3221
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3222
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3223
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3224 3225 3226 3227 3228 3229 3230 3231 3232

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3233
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3234 3235
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3236 3237 3238
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3239 3240 3241

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3242 3243
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3255 3256 3257
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3258
    ASSERT(strlen(dbName));
S
slzhou 已提交
3259
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3260 3261 3262 3263

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3264
    varDataSetLen(varDbName, strlen(dbName));
3265
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3266 3267 3268 3269
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3270
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3271
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3272
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3273
      varDataSetLen(varStbName, strlen(stbName));
3274
      colDataSetVal(colInfoData, 0, varStbName, false);
3275
    } else {
3276
      colDataSetNULL(colInfoData, 0);
3277
    }
S
slzhou 已提交
3278 3279 3280
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3281
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3282
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3283 3284 3285 3286
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3287
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3288 3289 3290
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3291
  size_t infodbTableNum;
S
slzhou 已提交
3292
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3293
  size_t perfdbTableNum;
S
slzhou 已提交
3294 3295
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3296
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3297
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3298 3299
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3300
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3301 3302 3303 3304
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3321 3322 3323 3324 3325 3326
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3327

S
slzhou 已提交
3328 3329 3330
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3331 3332 3333 3334 3335 3336 3337
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3338 3339 3340 3341 3342 3343 3344 3345
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3346
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3347 3348 3349 3350
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3351
  blockDataCleanup(pRes);
3352

S
slzhou 已提交
3353 3354 3355
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3356
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3357
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3358
  }
S
slzhou 已提交
3359

S
slzhou 已提交
3360 3361 3362 3363 3364
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3365 3366
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3367
  char        dbName[TSDB_DB_NAME_LEN] = {0};
3368 3369
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
S
slzhou 已提交
3370

S
slzhou 已提交
3371
  // get dbname
3372
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
S
slzhou 已提交
3373 3374 3375 3376
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3377
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3378 3379 3380 3381 3382 3383 3384 3385 3386
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
3387 3388 3389
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3390 3391 3392
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
H
Haojun Liao 已提交
3393
      if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList) < 0) {
S
slzhou 已提交
3394
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3395
      }
S
slzhou 已提交
3396 3397 3398
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
3399
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
S
slzhou 已提交
3400 3401 3402

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
3403
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName, pAPI);
S
slzhou 已提交
3404 3405

      pInfo->currGrpIdx++;
S
slzhou 已提交
3406
    } else {
S
slzhou 已提交
3407
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3408 3409
    }
  } else {
S
slzhou 已提交
3410 3411
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
3412 3413

    int64_t dbTableCount = 0;
3414
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &dbTableCount, NULL);
S
slzhou 已提交
3415 3416 3417 3418 3419 3420 3421
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
3422 3423 3424
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3425 3426
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
H
Haojun Liao 已提交
3427
      uint64_t uid = 0;
3428
      pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
3429 3430 3431 3432 3433

      int64_t numOfChildTables = 0;
      pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables);

      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
S
slzhou 已提交
3434
    } else {
H
Haojun Liao 已提交
3435 3436
      int64_t tbNumVnode = 0;
      pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3437
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3438
    }
S
slzhou 已提交
3439
  } else {
3440 3441
    int64_t tbNumVnode = 0;
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3442
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3443
  }
3444

S
slzhou 已提交
3445 3446 3447 3448
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3449
                                           SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI) {
S
slzhou 已提交
3450
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3451 3452 3453
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3454

S
slzhou 已提交
3455 3456
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
3457

3458
  int64_t numOfTables = 0;
3459
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
H
Haojun Liao 已提交
3460

3461 3462
  if (numOfTables != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes);
3463
  }
S
slzhou 已提交
3464 3465 3466
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3467
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI) {
S
slzhou 已提交
3468
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
3469
  pAPI->metaFn.getTableNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
S
slzhou 已提交
3470 3471

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3472
  if (pSupp->groupByDbName) {
H
Haojun Liao 已提交
3473
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, varDataVal(stbName));
D
dapan1121 已提交
3474
  } else {
H
Haojun Liao 已提交
3475
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", varDataVal(stbName));
D
dapan1121 已提交
3476
  }
X
Xiaoyu Wang 已提交
3477

S
slzhou 已提交
3478 3479 3480
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

H
Haojun Liao 已提交
3481 3482
  int64_t ctbNum = 0;
  int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
H
Haojun Liao 已提交
3483
  fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
S
shenglian zhou 已提交
3484 3485 3486
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3487
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3488 3489
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3490
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3491 3492
  taosMemoryFreeClear(param);
}
dengyihao's avatar
dengyihao 已提交
3493 3494

// clang-format on