scanoperator.c 132.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16 17
// clang-format off

18
#include "executorInt.h"
H
Haojun Liao 已提交
19
#include "filter.h"
20
#include "function.h"
21
#include "functionMgt.h"
L
Liu Jicong 已提交
22
#include "os.h"
H
Haojun Liao 已提交
23
#include "querynodes.h"
24
#include "systable.h"
H
Haojun Liao 已提交
25
#include "tname.h"
26
#include "ttime.h"
H
Haojun Liao 已提交
27 28 29 30 31 32 33 34

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
35 36
#include "operator.h"
#include "querytask.h"
H
Haojun Liao 已提交
37

38 39 40
#include "storageapi.h"
#include "wal.h"

D
dapan1121 已提交
41 42
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
43
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
44
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
45
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
L
fix bug  
liuyao 已提交
46
#define STREAM_SCAN_OP_NAME          "StreamScanOperator"
47

H
Haojun Liao 已提交
48 49 50 51 52 53 54 55 56 57
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
58
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
59 60
} STableMergeScanSortSourceParam;

61 62 63 64 65 66 67 68 69 70
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
71
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
72

H
Haojun Liao 已提交
73
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
74 75 76 77 78 79 80 81 82 83 84 85
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

86
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
87 88 89 90 91
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

92
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
93 94 95 96 97 98 99
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

100
  if (order == TSDB_ORDER_ASC) {
101
    w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
102
    ASSERT(w.ekey >= pBlockInfo->window.skey);
103

104
    if (w.ekey < pBlockInfo->window.ekey) {
105 106 107
      return true;
    }

108 109
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
110 111 112 113
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

114
      ASSERT(w.ekey > pBlockInfo->window.ekey);
115
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
116 117 118 119
        return true;
      }
    }
  } else {
120
    w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
121
    ASSERT(w.skey <= pBlockInfo->window.ekey);
122

123
    if (w.skey > pBlockInfo->window.skey) {
124 125 126
      return true;
    }

127
    while (1) {
128 129 130 131 132
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

H
Haojun Liao 已提交
133
      ASSERT(w.skey < pBlockInfo->window.skey);
134
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
135 136 137
        return true;
      }
    }
138 139 140 141 142
  }

  return false;
}

143 144 145 146 147 148 149 150 151 152 153
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
154 155
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
156 157 158 159 160

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
161
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
162 163 164
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
165

166 167 168
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

169 170 171 172 173 174 175 176
static int32_t insertTableToScanIgnoreList(STableScanInfo* pTableScanInfo, uint64_t uid) {
  if (NULL == pTableScanInfo->pIgnoreTables) {
    int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
    pTableScanInfo->pIgnoreTables = taosHashInit(tableNum,  taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (NULL == pTableScanInfo->pIgnoreTables) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
H
Haojun Liao 已提交
177

178 179 180 181 182
  taosHashPut(pTableScanInfo->pIgnoreTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes, sizeof(pTableScanInfo->scanTimes));

  return TSDB_CODE_SUCCESS;
}

183 184
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;
185
  int32_t code = TSDB_CODE_SUCCESS;
186

H
Haojun Liao 已提交
187
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
188 189 190
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
191
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
192 193

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
194
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
195 196 197 198 199 200 201 202 203

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
204
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
205 206 207 208 209 210 211 212 213

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
214
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
215 216 217

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
218
    code = insertTableToScanIgnoreList(pTableScanInfo, pBlockInfo->id.uid);
219 220
  }

221
  return code;
222 223
}

H
Haojun Liao 已提交
224
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
225
                               int32_t numOfRows) {
H
Haojun Liao 已提交
226
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
227 228 229
    return true;
  }

H
Haojun Liao 已提交
230
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
231 232 233
  return keep;
}

H
Haojun Liao 已提交
234
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
235 236
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

H
Haojun Liao 已提交
237
  bool    allColumnsHaveAgg = true;
G
Ganlin Zhao 已提交
238
  bool    hasNullSMA = false;
H
Haojun Liao 已提交
239
  int32_t code = pAPI->tsdReader.tsdReaderRetrieveBlockSMAInfo(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg, &hasNullSMA);
H
Haojun Liao 已提交
240
  if (code != TSDB_CODE_SUCCESS) {
241
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
242 243
  }

G
Ganlin Zhao 已提交
244
  if (!allColumnsHaveAgg || hasNullSMA) {
H
Haojun Liao 已提交
245 246 247 248 249
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
250
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
251
                               int32_t rows) {
H
Haojun Liao 已提交
252 253 254
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

255
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
256
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
257
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
258
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
259 260
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
261 262 263

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
264 265 266
  }
}

267
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
268
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
269
  const char* id = GET_TASKID(pTaskInfo);
270

271
  if (pLimitInfo->remainOffset > 0) {
272 273
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
274
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
275
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
276
      return false;
277
    } else {
278
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
279 280 281 282 283 284
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
285
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
286
    blockDataKeepFirstNRows(pBlock, keep);
287 288

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
289
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
290
    return true;
291
  }
292

293
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
294
  return false;
295 296
}

H
Haojun Liao 已提交
297
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
298
                             uint32_t* status) {
S
slzhou 已提交
299
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
300 301
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

302
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
303 304

  pCost->totalBlocks += 1;
305
  pCost->totalRows += pBlock->info.rows;
306

H
Haojun Liao 已提交
307
  bool loadSMA = false;
H
Haojun Liao 已提交
308
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
309
  if (pOperator->exprSupp.pFilterInfo != NULL ||
310
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
311 312 313 314
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
315
  taosMemoryFreeClear(pBlock->pBlockAgg);
316 317

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
318
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
319
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
320
    pCost->filterOutBlocks += 1;
321
    pCost->totalRows += pBlock->info.rows;
322
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
323 324
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
325 326 327
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
328
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
329
    pCost->skipBlocks += 1;
330
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
331
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
332
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
333
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
334
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
335
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
336
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
337
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
338
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
339
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
340
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
341 342
      return TSDB_CODE_SUCCESS;
    } else {
343
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
344
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
345
    }
H
Haojun Liao 已提交
346
  }
347

H
Haojun Liao 已提交
348
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
349

H
Haojun Liao 已提交
350
  // try to filter data block according to sma info
H
Haojun Liao 已提交
351
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
352 353 354
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
355
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
356
      if (!keep) {
X
Xiaoyu Wang 已提交
357 358
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
359 360 361
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

362
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
363 364
        return TSDB_CODE_SUCCESS;
      }
365
    }
H
Haojun Liao 已提交
366
  }
367

368 369 370
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

371
  // try to filter data block according to current results
372 373
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
374 375
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
376
    pCost->skipBlocks += 1;
377
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
378

379 380
    STableScanInfo* p1 = pOperator->info;
    if (taosHashGetSize(p1->pIgnoreTables) == taosArrayGetSize(p1->base.pTableListInfo->pTableList)) {
381 382 383 384
      *status = FUNC_DATA_REQUIRED_ALL_FILTEROUT;
    } else {
      *status = FUNC_DATA_REQUIRED_FILTEROUT;
    }
385 386 387
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
388 389
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
390

391
  SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
H
Haojun Liao 已提交
392
  if (p == NULL) {
H
Haojun Liao 已提交
393
    return terrno;
H
Haojun Liao 已提交
394 395
  }

H
Haojun Liao 已提交
396
  ASSERT(p == pBlock);
397
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
398

H
Haojun Liao 已提交
399 400
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
401

H
Haojun Liao 已提交
402
  if (pOperator->exprSupp.pFilterInfo != NULL) {
403
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
404
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
405

406 407
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
408

409 410
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
411
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
412 413 414 415
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
416 417
  }

418
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
419
  if (limitReached) {  // set operator flag is done
420 421
    setOperatorCompleted(pOperator);
  }
422

H
Haojun Liao 已提交
423
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
424 425 426
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
427
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
428 429 430
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
431
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
432 433
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
434 435
}

436 437
typedef struct STableCachedVal {
  const char* pName;
438
  STag*       pTags;
439 440
} STableCachedVal;

441 442 443 444 445 446 447 448 449 450 451
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
452 453
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
454
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
455 456 457 458
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
459
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
460 461 462 463 464 465 466
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

467
// const void *key, size_t keyLen, void *value
D
dapan1121 已提交
468 469 470 471 472 473
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value, void* ud) {
  (void)key;
  (void)keyLen;
  (void)ud;
  freeTableCachedVal(value);
}
474

475 476 477 478 479
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
480
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
481 482 483
  }
}

484 485
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
486
  // currently only the tbname pseudo column
487
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
488
    return TSDB_CODE_SUCCESS;
489 490
  }

491 492
  int32_t code = 0;

493 494 495 496
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

497
  bool            freeReader = false;
498
  STableCachedVal val = {0};
499 500

  SMetaReader mr = {0};
501
  LRUHandle*  h = NULL;
502

503 504 505
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

506
  // 1. check if it is existed in meta cache
507
  if (pCache == NULL) {
D
dapan1121 已提交
508
    pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_NOLOCK, &pHandle->api.metaFn);
509
    code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
510
    if (code != TSDB_CODE_SUCCESS) {
511
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
512
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
513 514
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
515 516 517

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
518
      } else {
S
slzhou 已提交
519 520
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
521
      }
522
      pHandle->api.metaReaderFn.clearReader(&mr);
523 524 525
      return terrno;
    }

526
    pHandle->api.metaReaderFn.readerReleaseLock(&mr);
527

528 529
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
530 531

    freeReader = true;
532
  } else {
533 534
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
535
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
536
    if (h == NULL) {
H
Haojun Liao 已提交
537
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
538
      code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
539
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
540
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
541
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
542
                pBlock->info.id.uid, tstrerror(terrno), idStr);
543 544
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
545
        } else {
H
Haojun Liao 已提交
546
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
547
                 idStr);
H
Haojun Liao 已提交
548
        }
549
        pHandle->api.metaReaderFn.clearReader(&mr);
550 551 552
        return terrno;
      }

553
      pHandle->api.metaReaderFn.readerReleaseLock(&mr);
554

H
Haojun Liao 已提交
555
      STableCachedVal* pVal = createTableCacheVal(&mr);
556

H
Haojun Liao 已提交
557
      val = *pVal;
558
      freeReader = true;
H
Haojun Liao 已提交
559

H
Haojun Liao 已提交
560
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
D
dapan1121 已提交
561
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
562 563 564 565 566 567 568 569
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
570

H
Haojun Liao 已提交
571
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
572
    }
H
Haojun Liao 已提交
573

574 575
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
576
  }
577

578 579
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
580
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
581 582

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
583
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
584

585
    int32_t functionId = pExpr1->pExpr->_function.functionId;
586 587 588

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
589
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
590
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
591
      STagVal tagVal = {0};
592
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
593
      const char* p = pHandle->api.metaFn.extractTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
594

595 596 597 598
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
599
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
600
      }
601

H
Haojun Liao 已提交
602 603
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
604
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
605
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
606
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
607 608 609
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
610 611
        if (code) {
          if (freeReader) {
612
            pHandle->api.metaReaderFn.clearReader(&mr);
D
dapan1121 已提交
613 614 615
          }
          return code;
        }
L
Liu Jicong 已提交
616
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
617
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
618
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
619
        }
620 621 622 623
      }
    }
  }

624 625
  // restore the rows
  pBlock->info.rows = backupRows;
626
  if (freeReader) {
627
    pHandle->api.metaReaderFn.clearReader(&mr);
628 629
  }

H
Haojun Liao 已提交
630
  return TSDB_CODE_SUCCESS;
631 632
}

H
Haojun Liao 已提交
633
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
634 635 636
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
637
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
638
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
639 640 641
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
642

H
Haojun Liao 已提交
643
  colInfoDataEnsureCapacity(&infoData, 1, false);
644
  colDataSetVal(&infoData, 0, buf, false);
645

H
Haojun Liao 已提交
646
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
647
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
648 649 650 651 652 653 654

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
655
  colDataDestroy(&infoData);
656 657
}

658
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
659
  STableScanInfo* pTableScanInfo = pOperator->info;
660
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
661 662
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;

L
Liu Jicong 已提交
663
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
664 665
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
666

667 668
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
669
  while (true) {
670
    code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
D
dapan1121 已提交
671
    if (code) {
672
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
673 674 675 676 677 678
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
679

680
    if (isTaskKilled(pTaskInfo)) {
681
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
682
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
683
    }
H
Haojun Liao 已提交
684

685
    if (pOperator->status == OP_EXEC_DONE) {
686
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
687 688 689
      break;
    }

690 691 692 693 694 695
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
696
    if (pBlock->info.id.uid) {
697
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
698
    }
699

700
    uint32_t status = 0;
701
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
702
    if (code != TSDB_CODE_SUCCESS) {
703
      T_LONG_JMP(pTaskInfo->env, code);
704
    }
705

706 707 708 709
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

710 711 712
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
713
    }
714

H
Haojun Liao 已提交
715 716
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
717

H
Haojun Liao 已提交
718
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
719
    pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
720
    return pBlock;
H
Haojun Liao 已提交
721 722 723 724
  }
  return NULL;
}

H
Haojun Liao 已提交
725
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
726 727
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
728
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
H
Haojun Liao 已提交
729 730

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
731
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
732 733 734
    return NULL;
  }

735 736
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
737 738 739
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
740 741
    }

742
    pTableScanInfo->scanTimes += 1;
743
    taosHashClear(pTableScanInfo->pIgnoreTables);
744

745
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
746
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
747 748
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
749
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
750

751
      // do prepare for the next round table scan operation
752
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
753
    }
754
  }
H
Haojun Liao 已提交
755

756
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
757
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
758 759
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
760
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
761
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
762
    }
H
Haojun Liao 已提交
763

764
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
765 766 767
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
768
      }
H
Haojun Liao 已提交
769

770
      pTableScanInfo->scanTimes += 1;
771
      taosHashClear(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
772

773
      if (pTableScanInfo->scanTimes < total) {
774
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
775
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
776

777
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
778
        pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
779
      }
H
Haojun Liao 已提交
780 781 782
    }
  }

wmmhello's avatar
wmmhello 已提交
783 784 785 786 787 788
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
789
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
790

791
  // scan table one by one sequentially
L
Liu Jicong 已提交
792
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
793
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
794
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
795

L
Liu Jicong 已提交
796
    while (1) {
H
Haojun Liao 已提交
797
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
798
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
799 800
        return result;
      }
H
Haojun Liao 已提交
801

L
Liu Jicong 已提交
802 803
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
804 805

      taosRLockLatch(&pTaskInfo->lock);
806
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
807

H
Haojun Liao 已提交
808
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
809
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
810
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
811 812
        return NULL;
      }
H
Haojun Liao 已提交
813

X
Xiaoyu Wang 已提交
814
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
815 816
      taosRUnLockLatch(&pTaskInfo->lock);

817
      pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
818
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
819
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
820

821
      pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
822 823
      pInfo->scanTimes = 0;
    }
824 825
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
826
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
827
        setOperatorCompleted(pOperator);
828 829
        return NULL;
      }
830

5
54liuyao 已提交
831
      int32_t        num = 0;
832
      STableKeyInfo* pList = NULL;
833
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
834
      ASSERT(pInfo->base.dataReader == NULL);
835

836
      int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
837
                                    (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
838 839 840
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
841 842 843 844

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
845
    }
H
Haojun Liao 已提交
846

H
Haojun Liao 已提交
847
    SSDataBlock* result = doGroupedTableScan(pOperator);
848 849 850
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
851

852
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
853
      setOperatorCompleted(pOperator);
854 855
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
856

857 858
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
859
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
860

5
54liuyao 已提交
861
    int32_t        num = 0;
862
    STableKeyInfo* pList = NULL;
863
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
864

865 866
    pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
    pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
867
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
868

H
Haojun Liao 已提交
869
    result = doGroupedTableScan(pOperator);
870 871 872
    if (result != NULL) {
      return result;
    }
873

H
Haojun Liao 已提交
874
    setOperatorCompleted(pOperator);
875 876
    return NULL;
  }
H
Haojun Liao 已提交
877 878
}

879 880
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
881
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
882
  *pRecorder = pTableScanInfo->base.readRecorder;
883 884 885 886 887
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

888
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
889
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
890

891
  pAPI->tsdReaderClose(pBase->dataReader);
892
  pBase->dataReader = NULL;
893

894 895
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
896
  }
L
Liu Jicong 已提交
897

898
  tableListDestroy(pBase->pTableListInfo);
899 900 901 902 903 904 905
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
906
  taosHashCleanup(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
907
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
D
dapan1121 已提交
908
  taosMemoryFreeClear(param);
909 910
}

911
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
912
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
913
  int32_t         code = 0;
H
Haojun Liao 已提交
914 915 916
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
917
    code = TSDB_CODE_OUT_OF_MEMORY;
918
    goto _error;
H
Haojun Liao 已提交
919 920
  }

921
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
922
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
923 924

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
925
  code =
H
Haojun Liao 已提交
926
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
927 928 929 930
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
931
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
932
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
933
  if (code != TSDB_CODE_SUCCESS) {
934
    goto _error;
935 936
  }

H
Haojun Liao 已提交
937
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
938
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
939
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
940
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
941 942
  }

943
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
944
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
945

H
Haojun Liao 已提交
946 947
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
948 949
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

950 951
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
952

H
Haojun Liao 已提交
953
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
954
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
955
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
956
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
957

H
Haojun Liao 已提交
958 959 960
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
961 962
  }

wmmhello's avatar
wmmhello 已提交
963
  pInfo->currentGroupId = -1;
964
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
965
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
966

L
Liu Jicong 已提交
967 968
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
969
  pOperator->exprSupp.numOfExprs = numOfCols;
970

971
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
972 973
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
974 975 976
    code = terrno;
    goto _error;
  }
977

D
dapan1121 已提交
978 979 980 981
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
982
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
983 984
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
985 986 987

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
988
  return pOperator;
989

990
_error:
991 992 993
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
994

995 996
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
997
  return NULL;
H
Haojun Liao 已提交
998 999
}

1000
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1001
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
1002
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1003

H
Haojun Liao 已提交
1004
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
1005
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
1006

L
Liu Jicong 已提交
1007 1008
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1009
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1010 1011 1012
  return pOperator;
}

1013
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1014
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1015 1016
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1017 1018
}

1019
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1020
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1021 1022
}

1023
static bool isStateWindow(SStreamScanInfo* pInfo) {
1024
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1025
}
5
54liuyao 已提交
1026

L
Liu Jicong 已提交
1027
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1028 1029 1030
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1031 1032 1033
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1034
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1035 1036
}

1037 1038 1039 1040
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1041
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1042 1043
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1044
  ASSERT(rowIndex < pBlock->info.rows);
1045
  pInfo->groupId = groupCol[rowIndex];
1046 1047
}

1048
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) {
H
Haojun Liao 已提交
1049
  pTableScanInfo->base.cond.twindows = *pWin;
L
liuyao 已提交
1050
  pTableScanInfo->base.cond.startVersion = 0;
1051
  pTableScanInfo->base.cond.endVersion = ver;
L
Liu Jicong 已提交
1052 1053
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1054
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1055
  pTableScanInfo->base.dataReader = NULL;
1056 1057
}

L
Liu Jicong 已提交
1058 1059
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1060
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1061

1062
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1063
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1064 1065 1066 1067 1068 1069

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
1070
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
1071 1072 1073

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
1074
  int32_t      code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
1075
                                     (void**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
1076 1077
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1078
    T_LONG_JMP(pTaskInfo->env, code);
1079 1080 1081
    return NULL;
  }

D
dapan1121 已提交
1082
  bool hasNext = false;
1083
  code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext);
1084 1085
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1086
    T_LONG_JMP(pTaskInfo->env, code);
1087 1088 1089
    return NULL;
  }

D
dapan1121 已提交
1090
  if (hasNext) {
1091
    /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1092
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1093
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1094 1095
  }

1096
  pAPI->tsdReader.tsdReaderClose(pReader);
D
dapan1121 已提交
1097
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1098 1099
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1100 1101

  return pBlock->info.rows > 0 ? pBlock : NULL;
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1113
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1114
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1115
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1116 1117
}

5
54liuyao 已提交
1118 1119 1120 1121 1122 1123 1124 1125
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1126
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1127 1128 1129
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1140 1141 1142
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1143 1144 1145 1146 1147 1148

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1149
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1150 1151 1152 1153
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1154 1155 1156
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1157
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1158 1159 1160
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1161

1162
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1163 1164 1165
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1166

1167 1168
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1169 1170 1171
    break;
  }

1172
  STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
L
liuyao 已提交
1173
  qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, pInfo->pUpdateInfo->maxDataVersion);
1174
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1175
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1176 1177 1178
  return true;
}

5
54liuyao 已提交
1179
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1180
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1181
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1182
  dumyInfo.cur.pageId = -1;
1183
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1184 1185
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1186
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1187

5
54liuyao 已提交
1188
  while (1) {
1189 1190 1191
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1192
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1193 1194 1195 1196 1197
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1198
    }
5
54liuyao 已提交
1199

5
54liuyao 已提交
1200 1201 1202
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1203
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1204
    endWin = preWin;
5
54liuyao 已提交
1205
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1206 1207 1208 1209 1210 1211
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1212

L
Liu Jicong 已提交
1213
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1214
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1215
  bool prepareRes = true;
L
Liu Jicong 已提交
1216 1217 1218
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1219 1220
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1221 1222 1223 1224
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1225 1226 1227
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1228 1229
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1230
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1231
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1232
      pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1233
      pTableScanInfo->base.dataReader = NULL;
1234 1235
      return NULL;
    }
L
Liu Jicong 已提交
1236

H
Haojun Liao 已提交
1237
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1238 1239 1240 1241
    if (pResult->info.rows == 0) {
      continue;
    }

1242 1243 1244 1245 1246 1247 1248 1249
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1250 1251
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1252
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1253 1254 1255 1256
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1257 1258 1259

      blockDataDestroy(tmpBlock);

1260 1261 1262 1263
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1264
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1265
      pResult->info.calWin = pInfo->updateWin;
1266
      return pResult;
5
54liuyao 已提交
1267 1268
    }
  }
1269
}
1270

1271
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1272
                                   SSessionKey* pKey) {
1273 1274 1275
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1276

1277 1278
  void* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
1279 1280 1281
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1282 1283

  taosMemoryFree(pCur);
1284 1285 1286
  return code;
}

1287
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1288
  blockDataCleanup(pDestBlock);
1289 1290
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1291
  }
1292
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1293
  if (code != TSDB_CODE_SUCCESS) {
1294
    return code;
L
Liu Jicong 已提交
1295
  }
1296 1297
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1298
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1299
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1300
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1301 1302
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1303

1304 1305
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1306
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1307
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1308 1309
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
H
Haojun Liao 已提交
1310
  int64_t          ver = pSrcBlock->info.version - 1;
1311
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
H
Haojun Liao 已提交
1312
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver);
L
Liu Jicong 已提交
1313
    // gap must be 0.
5
54liuyao 已提交
1314
    SSessionKey startWin = {0};
1315
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1316
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1317 1318 1319
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1320 1321
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1322
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1323 1324 1325 1326
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1327
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1328 1329
      continue;
    }
1330 1331
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1332

1333
    colDataSetNULL(pDestUidCol, i);
1334
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1335 1336
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1337
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1338
  }
1339
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1340
}
1341 1342 1343 1344 1345 1346

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1347
  }
1348

1349 1350
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1351 1352
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1353

L
Liu Jicong 已提交
1354
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1355
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1356 1357
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
H
Haojun Liao 已提交
1358
  int64_t ver = pSrcBlock->info.version - 1;
5
54liuyao 已提交
1359 1360 1361 1362 1363

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
H
Haojun Liao 已提交
1364
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
5
54liuyao 已提交
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1392 1393
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1394
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1395 1396 1397
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1398
  for (int32_t i = 0; i < rows;) {
1399
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1400 1401
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
H
Haojun Liao 已提交
1402
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
5
54liuyao 已提交
1403 1404
    }
    TSKEY calStartTs = srcStartTsCol[i];
1405
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1406
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1407 1408
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1409 1410 1411 1412 1413
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1414
    pDestBlock->info.rows++;
5
54liuyao 已提交
1415
  }
1416 1417
  return TSDB_CODE_SUCCESS;
}
1418

1419
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1420 1421 1422
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1423 1424
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1425
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1426 1427 1428 1429
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1430 1431 1432 1433 1434 1435 1436 1437 1438
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
H
Haojun Liao 已提交
1439
  int64_t ver = pSrcBlock->info.version - 1;
1440
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1441 1442
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1443
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1444
    if (groupId == 0) {
H
Haojun Liao 已提交
1445
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
5
54liuyao 已提交
1446
    }
L
Liu Jicong 已提交
1447
    if (pInfo->tbnameCalSup.pExprInfo) {
1448
      void* parTbname = NULL;
1449
      pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
1450

L
Liu Jicong 已提交
1451 1452
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
1453
      pInfo->stateStore.streamStateFreeVal(parTbname);
L
Liu Jicong 已提交
1454 1455 1456
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1457 1458 1459 1460
  }
  return TSDB_CODE_SUCCESS;
}

1461 1462 1463 1464
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1465
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1466
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1467 1468
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1469
  }
1470
  pDestBlock->info.type = STREAM_CLEAR;
1471
  pDestBlock->info.version = pSrcBlock->info.version;
1472
  pDestBlock->info.dataLoad = 1;
1473 1474 1475 1476
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1477
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1478
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1479 1480
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1481
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1482
  } else {
5
54liuyao 已提交
1483
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
1484
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
L
Liu Jicong 已提交
1485
  }
L
Liu Jicong 已提交
1486 1487
}

1488 1489
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1490 1491
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1492 1493
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1494 1495
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1496
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1497 1498 1499 1500 1501 1502 1503
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1504
  pBlock->info.rows++;
5
54liuyao 已提交
1505 1506
}

1507
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1508 1509
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1510
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1511
  }
1512 1513
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1514
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
1515
  bool   tableInserted = pInfo->stateStore.updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1516
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1517 1518
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1519
    bool        isClosed = false;
5
54liuyao 已提交
1520
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1521
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1522 1523 1524 1525 1526
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1527 1528 1529
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1530
    // must check update info first.
1531
    bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1532
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1533
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore);
L
Liu Jicong 已提交
1534
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1535
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1536
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1537
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1538
                                       NULL);
5
54liuyao 已提交
1539 1540
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1541 1542
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1543
      }
1544 1545
    }
  }
1546 1547
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1548
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1549
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1550
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1551 1552
  }
}
L
Liu Jicong 已提交
1553

1554
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1555 1556
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1557
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1558

1559 1560
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1561
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1562
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1563
  pInfo->pRes->info.type = STREAM_NORMAL;
1564
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1565

1566
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1567
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1568 1569

  // todo extract method
H
Haojun Liao 已提交
1570 1571 1572
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1573 1574 1575 1576 1577 1578 1579
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1580
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1581
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1582 1583 1584 1585 1586 1587 1588
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1589
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1590
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1591 1592 1593 1594 1595
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1596
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
L
liuyao 已提交
1597
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
K
kailixu 已提交
1598 1599
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1600
      blockDataFreeRes((SSDataBlock*)pBlock);
1601
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1602
    }
K
kailixu 已提交
1603 1604 1605

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1606 1607
  }

1608
  if (filter) {
H
Haojun Liao 已提交
1609
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1610
  }
1611

1612
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1613
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
1614
//  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1615

L
Liu Jicong 已提交
1616
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1617 1618
  return 0;
}
5
54liuyao 已提交
1619

L
Liu Jicong 已提交
1620
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1621 1622 1623
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

1624
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1625
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1626

1627
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1628

1629
  if (isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
1630
    return NULL;
L
Liu Jicong 已提交
1631 1632
  }

1633
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1634 1635
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
1636 1637 1638
//      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
//             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
//             pInfo->tqReader->pWalReader->curVersion);
1639
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1640
      return pResult;
1641
    }
1642

1643
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1644
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1645

1646 1647
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1648
    if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1649
      return NULL;
1650
    }
1651

wmmhello's avatar
wmmhello 已提交
1652
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1653 1654
  }

1655
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
1656

L
Liu Jicong 已提交
1657
    while (1) {
1658 1659
      bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);

H
Haojun Liao 已提交
1660
      SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
1661
      struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
1662 1663

      // curVersion move to next, so currentOffset = curVersion - 1
1664
      tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1);
1665

1666
      if (hasResult) {
1667
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
X
Xiaoyu Wang 已提交
1668
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1669
        blockDataCleanup(pInfo->pRes);
1670
        setBlockIntoRes(pInfo, pRes, true);
L
Liu Jicong 已提交
1671 1672 1673
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
1674
      } else {
wmmhello's avatar
wmmhello 已提交
1675
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1676 1677 1678
        return NULL;
      }
    }
L
Liu Jicong 已提交
1679
  } else {
1680
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1681
    return NULL;
H
Haojun Liao 已提交
1682
  }
L
Liu Jicong 已提交
1683 1684
}

L
Liu Jicong 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
1700 1701

  int32_t j = 0;
L
Liu Jicong 已提交
1702
  for (int32_t i = 0; i < rows; i++) {
1703
    if (pInfo->readerFn.tqReaderIsQueriedTable(pReader, uidCol[i])) {
1704 1705 1706
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1707

1708 1709 1710
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1711 1712 1713
      j++;
    }
  }
1714

L
Liu Jicong 已提交
1715
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1716 1717
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1718
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1719 1720 1721 1722

  return 0;
}

5
54liuyao 已提交
1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1735
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1736 1737 1738 1739
    }
  }
}

5
54liuyao 已提交
1740
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
L
liuyao 已提交
1741
  if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
1742
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1743
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1755 1756 1757 1758
    }
  }
}

1759 1760 1761 1762 1763 1764
//int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
//  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
//  *pBuff = taosMemoryCalloc(1, len);
//  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
//  return len;
//}
L
liuyao 已提交
1765 1766

// other properties are recovered from the execution plan
1767
void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
L
fix bug  
liuyao 已提交
1768
  if (!pBuff || len == 0) {
L
liuyao 已提交
1769 1770 1771
    return;
  }

1772 1773
  void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
  int32_t      code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1774 1775 1776 1777 1778
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1779 1780
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
1781 1782
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  const char*    id = GET_TASKID(pTaskInfo);
1783

1784
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
L
Liu Jicong 已提交
1785 1786
  SStreamScanInfo* pInfo = pOperator->info;

1787
  qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1788

1789 1790
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1791
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1792
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1793
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1794 1795
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
1796
      qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
H
Haojun Liao 已提交
1797
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1798
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1799
    } else {
H
Haojun Liao 已提交
1800 1801
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
1802
      qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
H
Haojun Liao 已提交
1803
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1804
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1805
    }
L
Liu Jicong 已提交
1806

1807
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1808

H
Haojun Liao 已提交
1809
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1810
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1811

L
Liu Jicong 已提交
1812 1813
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1814
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1815 1816
  }

5
54liuyao 已提交
1817 1818
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1819 1820 1821 1822 1823
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1824 1825 1826 1827 1828 1829 1830

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1831 1832 1833 1834
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1835
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1836 1837
        return pInfo->pUpdateRes;
      } break;
1838 1839 1840 1841 1842 1843 1844 1845 1846
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1847 1848 1849 1850 1851 1852
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1853
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1854 1855 1856 1857 1858 1859
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1860 1861 1862 1863 1864 1865
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1866
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1867
      calBlockTbName(pInfo, pInfo->pRecoverRes);
L
liuyao 已提交
1868
      if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
5
54liuyao 已提交
1869
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
1870
          TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1871 1872
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
L
liuyao 已提交
1873
          pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer2);
5
54liuyao 已提交
1874 1875
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1876
      }
5
54liuyao 已提交
1877 1878
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1879
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1880 1881
        return pInfo->pCreateTbRes;
      }
1882

X
Xiaoyu Wang 已提交
1883
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1884 1885
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1886 1887
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1888
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1889
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1890

H
Haojun Liao 已提交
1891
    pTSInfo->base.dataReader = NULL;
1892

H
Haojun Liao 已提交
1893 1894
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1895

L
Liu Jicong 已提交
1896
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1897 1898 1899
    return NULL;
  }

5
54liuyao 已提交
1900
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1901
// TODO: refactor
L
Liu Jicong 已提交
1902
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1903
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1904
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1905
      doClearBufferedBlocks(pInfo);
H
Haojun Liao 已提交
1906 1907 1908
      return NULL;
    }

1909 1910 1911
    int32_t  current = pInfo->validBlockIndex++;
    qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);

L
Liu Jicong 已提交
1912 1913
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1914
    if (pBlock->info.parTbName[0]) {
1915
      pAPI->stateStore.streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1916
    }
1917

1918
    // TODO move into scan
5
54liuyao 已提交
1919 1920
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1921
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
1922
    if (pInfo->pUpdateInfo) {
1923
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
L
fix bug  
liuyao 已提交
1924
    }
1925
    blockDataUpdateTsWindow(pBlock, 0);
1926
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1927 1928 1929
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1930 1931 1932
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1933
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
1934
        pInfo->updateResIndex = 0;
1935
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1936
        pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
1937 1938
      } break;
      case STREAM_DELETE_DATA: {
1939
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1940
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1941
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1942
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1943
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1944 1945
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1946
        }
5
54liuyao 已提交
1947 1948
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1949 1950 1951 1952 1953 1954
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1955
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1956
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1957
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1958
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1959 1960
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1961 1962 1963 1964 1965
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1966 1967 1968
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1969
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1970 1971 1972
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1973 1974 1975 1976
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1977
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1978
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1979 1980 1981 1982
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1983
        }
1984 1985 1986
      } break;
      default:
        break;
5
54liuyao 已提交
1987
    }
1988
    // printDataBlock(pBlock, "stream scan recv");
1989
    return pBlock;
L
Liu Jicong 已提交
1990
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
1991
    qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
5
54liuyao 已提交
1992 1993 1994
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1995
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1996 1997 1998
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1999 2000 2001
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2002
      } break;
2003
      case STREAM_SCAN_FROM_DELETE_DATA: {
2004 2005 2006 2007 2008 2009 2010
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2021
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2022 2023
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2024
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2025
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2026 2027
          return pSDB;
        }
2028
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2029 2030 2031 2032
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2033
    }
2034

2035
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2036
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2037 2038
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2039 2040
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2041
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2042
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2043
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2044
    }
5
54liuyao 已提交
2045

2046 2047
    SSDataBlock*    pBlock = pInfo->pRes;
    SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
2048
    int32_t         totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
2049

L
Liu Jicong 已提交
2050
  NEXT_SUBMIT_BLK:
2051
    while (1) {
2052
      if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
2053
        if (pInfo->validBlockIndex >= totalBlocks) {
2054
          pAPI->stateStore.updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
          doClearBufferedBlocks(pInfo);

          qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
          return NULL;
        }

        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);

        qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
2065
        if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2066 2067 2068 2069
          qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
          continue;
        }
      }
2070

2071
      blockDataCleanup(pBlock);
2072

2073 2074
      while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
        SSDataBlock* pRes = NULL;
2075

2076
        int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
2077 2078
        qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
               id);
H
Haojun Liao 已提交
2079

2080
        if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
2081
          qDebug("retrieve data failed, try next block in submit block, %s", id);
2082 2083 2084
          continue;
        }

2085
        setBlockIntoRes(pInfo, pRes, false);
2086

5
54liuyao 已提交
2087 2088
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
2089
          qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
5
54liuyao 已提交
2090
          return pInfo->pCreateTbRes;
2091 2092
        }

2093 2094 2095 2096
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
        doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
        pBlock->info.dataLoad = 1;
        blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
2097

2098 2099
        qDebug("%" PRId64 " rows in datablock, update res:%" PRId64 " %s", pBlockInfo->rows,
               pInfo->pUpdateDataRes->info.rows, id);
2100 2101 2102
        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
          break;
        }
2103
      }
H
Haojun Liao 已提交
2104

2105
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2106
        break;
2107 2108
      } else {
        continue;
5
54liuyao 已提交
2109
      }
H
Haojun Liao 已提交
2110 2111 2112 2113
    }

    // record the scan action.
    pInfo->numOfExec++;
2114
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
2115

2116
    qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
L
Liu Jicong 已提交
2117
    if (pBlockInfo->rows > 0) {
2118
      return pBlock;
L
Liu Jicong 已提交
2119
    }
2120 2121 2122 2123 2124 2125

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
H
Haojun Liao 已提交
2126
  }
2127

2128
  return NULL;
H
Haojun Liao 已提交
2129 2130
}

H
Haojun Liao 已提交
2131
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2132 2133 2134
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2135 2136 2137
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2138 2139 2140 2141 2142 2143
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2144
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2145
  // NOTE: this operator does never check if current status is done or not
2146 2147 2148
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

2149
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2150
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2151
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2152
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2153

wmmhello's avatar
wmmhello 已提交
2154
  qDebug("tmqsnap doRawScan called");
2155
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2156 2157
    bool hasNext = false;
    if (pInfo->dataReader) {
2158
      code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
D
dapan1121 已提交
2159
      if (code) {
2160
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2161
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2162 2163
      }
    }
X
Xiaoyu Wang 已提交
2164

D
dapan1121 已提交
2165
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2166
      if (isTaskKilled(pTaskInfo)) {
2167
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2168
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2169
      }
2170

2171
      SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL);
H
Haojun Liao 已提交
2172
      if (pBlock == NULL) {
2173
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2174 2175
      }

H
Haojun Liao 已提交
2176
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2177
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2178 2179
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2180

H
Haojun Liao 已提交
2181
    SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2182
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2183
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2184
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2185
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2186
    } else {
2187
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2188
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2189
    }
2190
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2191
    tDeleteSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2192
    return NULL;
2193
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2194 2195 2196 2197 2198
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
H
Haojun Liao 已提交
2199 2200
    if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
      qError("tmqsnap getTableInfoFromSnapshot error");
wmmhello's avatar
wmmhello 已提交
2201
      taosMemoryFreeClear(data);
2202 2203 2204
      return NULL;
    }

2205
    if (!sContext->queryMeta) {  // change to get data next poll request
2206 2207 2208
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2209
    } else {
2210
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2211 2212 2213 2214
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2215

wmmhello's avatar
wmmhello 已提交
2216
    return NULL;
2217 2218 2219 2220
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2221
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2222
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
2223
  pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader);
2224
  pRawScan->pAPI->snapshotFn.destroySnapshot(pRawScan->sContext);
2225
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2226 2227 2228
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2229 2230 2231
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2232
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2233 2234 2235 2236 2237
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2238 2239
  int32_t code = TSDB_CODE_SUCCESS;

2240
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2241
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2242
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2243 2244
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2245 2246
  }

2247
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2248
  pInfo->vnode = pHandle->vnode;
H
Haojun Liao 已提交
2249
  pInfo->pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
2250

2251
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2252 2253
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2254

2255
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2256
  return pOperator;
H
Haojun Liao 已提交
2257

L
Liu Jicong 已提交
2258
_end:
H
Haojun Liao 已提交
2259 2260 2261 2262
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2263 2264
}

2265
static void destroyStreamScanOperatorInfo(void* param) {
2266
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2267

2268
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
2269
    destroyOperator(pStreamScan->pTableScanOp);
2270
  }
2271

2272
  if (pStreamScan->tqReader) {
2273
    pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
2274
  }
H
Haojun Liao 已提交
2275 2276
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2277
  }
C
Cary Xu 已提交
2278 2279
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2280
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2281
  }
C
Cary Xu 已提交
2282

L
Liu Jicong 已提交
2283
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2284
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2285

2286
  pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
2287 2288 2289 2290
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2291
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2292
  blockDataDestroy(pStreamScan->pCreateTbRes);
2293 2294 2295 2296
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2297
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2298
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2299
  SArray*          pColIds = NULL;
2300 2301
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2302 2303
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
  const char* idstr = pTaskInfo->id.str;
2304

H
Haojun Liao 已提交
2305
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2306
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2307
    tableListDestroy(pTableListInfo);
2308
    goto _error;
H
Haojun Liao 已提交
2309 2310
  }

2311
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2312
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2313

2314
  pInfo->pTagCond = pTagCond;
2315
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2316

2317
  int32_t numOfCols = 0;
2318 2319
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2320
  if (code != TSDB_CODE_SUCCESS) {
2321
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2322 2323
    goto _error;
  }
2324

H
Haojun Liao 已提交
2325
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2326
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2327
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2328
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2329 2330

    int16_t colId = id->colId;
2331
    taosArrayPush(pColIds, &colId);
2332
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2333
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2334
    }
H
Haojun Liao 已提交
2335 2336
  }

L
Liu Jicong 已提交
2337 2338 2339 2340
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2341
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2342 2343
      goto _error;
    }
2344

L
Liu Jicong 已提交
2345 2346
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
2347
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) {
2348
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2349 2350 2351 2352
      goto _error;
    }
  }

2353 2354
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2355
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2356 2357
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2358
      tableListDestroy(pTableListInfo);
2359 2360
      goto _error;
    }
2361
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
2362
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2363
      tableListDestroy(pTableListInfo);
2364 2365 2366 2367
      goto _error;
    }
  }

L
Liu Jicong 已提交
2368
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2369
  if (pInfo->pBlockLists == NULL) {
2370
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2371
    tableListDestroy(pTableListInfo);
2372
    goto _error;
H
Haojun Liao 已提交
2373 2374
  }

5
54liuyao 已提交
2375
  if (pHandle->vnode) {
2376
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2377
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2378
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2379
      pTSInfo->base.cond.endVersion = pHandle->version;
2380
    }
L
Liu Jicong 已提交
2381

2382
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2383
    int32_t        num = 0;
2384
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2385

2386
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2387
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2388
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2389 2390
    }

L
Liu Jicong 已提交
2391 2392
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
2393
      pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
L
Liu Jicong 已提交
2394
      ASSERT(pInfo->tqReader);
2395
    } else {
L
Liu Jicong 已提交
2396 2397
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2398 2399
    }

2400
    pInfo->pUpdateInfo = NULL;
2401
    pInfo->pTableScanOp = pTableScanOp;
2402
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
2403
      pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
2404
    }
L
Liu Jicong 已提交
2405

L
Liu Jicong 已提交
2406
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2407
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2408 2409
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2410

L
Liu Jicong 已提交
2411
    // set the extract column id to streamHandle
2412
    pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
2413
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2414
    code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
L
Liu Jicong 已提交
2415 2416 2417 2418
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2419

L
Liu Jicong 已提交
2420
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2421
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2422 2423
  } else {
    taosArrayDestroy(pColIds);
2424
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2425
    pColIds = NULL;
5
54liuyao 已提交
2426 2427
  }

2428 2429 2430 2431 2432
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2433 2434 2435 2436 2437
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2438
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2439
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2440
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2441
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2442
  pInfo->groupId = 0;
2443
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2444
  pInfo->pStreamScanOp = pOperator;
2445
  pInfo->deleteDataIndex = 0;
2446
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2447
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2448
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2449
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2450
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2451 2452
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2453
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2454
  pInfo->pState = NULL;
2455 2456
  pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
  pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
L
Liu Jicong 已提交
2457

L
fix bug  
liuyao 已提交
2458 2459 2460 2461
  // for stream
  if (pTaskInfo->streamInfo.pState) {
    void*   buff = NULL;
    int32_t len = 0;
2462 2463
    pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
    streamScanOperatorDecode(buff, len, pInfo);
dengyihao's avatar
dengyihao 已提交
2464
    taosMemoryFree(buff);
L
fix bug  
liuyao 已提交
2465
  }
L
liuyao 已提交
2466

L
fix bug  
liuyao 已提交
2467
  setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
L
Liu Jicong 已提交
2468
                  pTaskInfo);
2469
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2470

2471
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2472 2473
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2474

H
Haojun Liao 已提交
2475
  return pOperator;
2476

L
Liu Jicong 已提交
2477
_error:
H
Haojun Liao 已提交
2478 2479 2480 2481 2482 2483 2484 2485
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2486 2487
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2488 2489
}

2490
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, SStorageAPI* pAPI) {
2491 2492 2493 2494
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];

2495
  STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
2496
  int32_t        code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
2497 2498 2499 2500
  tDecoderClear(&(*mr).coder);
  if (code != TSDB_CODE_SUCCESS) {
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
           GET_TASKID(pTaskInfo));
2501
    pAPI->metaReaderFn.clearReader(mr);
2502 2503 2504
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

2505
  char str[512];
2506 2507 2508 2509 2510 2511
  for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

    // refactor later
    if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
      STR_TO_VARSTR(str, (*mr).me.name);
2512
      colDataSetVal(pDst, (count), str, false);
2513 2514 2515
    } else {  // it is a tag value
      STagVal val = {0};
      val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2516
      const char* p = pAPI->metaFn.extractTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val);
2517 2518 2519 2520 2521 2522 2523

      char* data = NULL;
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
        data = (char*)p;
      }
2524
      colDataSetVal(pDst, (count), data,
2525 2526 2527 2528 2529 2530 2531 2532 2533 2534
                    (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));

      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
          data != NULL) {
        taosMemoryFree(data);
      }
    }
  }
}

2535
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2536 2537 2538 2539
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2540
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2541
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2542 2543

  STagScanInfo* pInfo = pOperator->info;
2544
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2545
  SSDataBlock*  pRes = pInfo->pRes;
2546
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2547

2548
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2549
  if (size == 0) {
H
Haojun Liao 已提交
2550 2551 2552 2553
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2554 2555 2556
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
H
Haojun Liao 已提交
2557
  pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
H
Haojun Liao 已提交
2558

wmmhello's avatar
wmmhello 已提交
2559
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2560
    doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
2561
    ++count;
wmmhello's avatar
wmmhello 已提交
2562
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2563
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2564
    }
2565
    // each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
2566
    if (pInfo->pSlimit != NULL) {
2567 2568 2569
      if (pInfo->curPos < pInfo->pSlimit->offset) {
        continue;
      }
2570
      pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
2571 2572 2573
      if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
        setOperatorCompleted(pOperator);
      }
2574
      break;
H
Haojun Liao 已提交
2575 2576 2577
    }
  }

2578
  pAPI->metaReaderFn.clearReader(&mr);
2579

2580
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2581
  if (pOperator->status == OP_EXEC_DONE) {
2582
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2583 2584 2585
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2586
  pOperator->resultInfo.totalRows += count;
2587

2588
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2589 2590
}

2591
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2592 2593
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2594
  taosArrayDestroy(pInfo->matchInfo.pList);
2595
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2596
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2597 2598
}

S
slzhou 已提交
2599
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2600
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2601
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2602 2603 2604 2605 2606
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2607 2608 2609 2610
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2611
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
2612 2613 2614
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2615

H
Haojun Liao 已提交
2616 2617
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2618 2619 2620
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2621

2622
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2623
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2624 2625
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2626
  pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
2627

L
Liu Jicong 已提交
2628 2629
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2630
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2631 2632
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2633 2634
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2635 2636

  return pOperator;
2637

2638
_error:
H
Haojun Liao 已提交
2639 2640 2641 2642 2643
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2644

dengyihao's avatar
dengyihao 已提交
2645
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2646 2647 2648 2649
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2650 2651
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

dengyihao's avatar
opt mem  
dengyihao 已提交
2652 2653
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2654
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2655

H
Haojun Liao 已提交
2656
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2657

L
Liu Jicong 已提交
2658
  int64_t      st = taosGetTimestampUs();
2659
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2660
  SReadHandle* pHandle = &pInfo->base.readHandle;
D
dapan1121 已提交
2661
  if (NULL == source->dataReader) {
2662
    code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
D
dapan1121 已提交
2663 2664 2665
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2666
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2667

D
dapan1121 已提交
2668
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2669
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2670
  bool         hasNext = false;
2671
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2672 2673

  while (true) {
2674
    code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
D
dapan1121 已提交
2675
    if (code != 0) {
2676
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
D
dapan1121 已提交
2677 2678 2679 2680 2681 2682 2683
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2684

H
Haojun Liao 已提交
2685
    if (isTaskKilled(pTaskInfo)) {
2686
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
D
dapan1121 已提交
2687
      pInfo->base.dataReader = NULL;
2688
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2689 2690 2691
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2692
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2693 2694 2695 2696
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2697
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2698 2699 2700 2701
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2702 2703

    uint32_t status = 0;
2704
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2705
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2706
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2707
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2708 2709
    }

2710 2711 2712 2713
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

dengyihao's avatar
opt mem  
dengyihao 已提交
2714 2715 2716 2717 2718
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2719
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2720

H
Haojun Liao 已提交
2721
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2722
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2723

2724
    qTrace("tsdb/read-table-data: %p, close reader", reader);
H
Haojun Liao 已提交
2725
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2726 2727
    return pBlock;
  }
H
Haojun Liao 已提交
2728

D
dapan1121 已提交
2729 2730
  pAPI->tsdReader.tsdReaderClose(source->dataReader);
  source->dataReader = NULL;
H
Haojun Liao 已提交
2731
  pInfo->base.dataReader = NULL;
D
dapan1121 已提交
2732 2733
  blockDataDestroy(source->inputBlock);
  source->inputBlock = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2734 2735 2736
  return NULL;
}

2737 2738 2739
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2740
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2741
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2742
      tsTargetSlotId = colInfo->dstSlotId;
2743 2744 2745
    }
  }

2746 2747 2748
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2749
  bi.slotId = tsTargetSlotId;
2750 2751 2752 2753 2754 2755 2756
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2757
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2758 2759 2760 2761 2762 2763 2764
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2765

2766
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2767 2768 2769
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2770
  {
2771
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2772
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2773
    for (; i < numOfTables; ++i) {
2774
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2775 2776 2777 2778 2779 2780
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2781

S
slzhou 已提交
2782 2783
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2784

H
Haojun Liao 已提交
2785
  pInfo->base.dataReader = NULL;
2786

2787 2788
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
D
dapan1121 已提交
2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801
  // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
  int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
  if (kWay >= 128) {
    kWay = 128;
  } else if (kWay <= 2) {
    kWay = 2;
  } else {
    int i = 2; 
    while (i * 2 <= kWay) i = i * 2;
    kWay = i;
  }

  pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
2802
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2803
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
2804
                                             pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
2805

dengyihao's avatar
dengyihao 已提交
2806
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2807 2808 2809 2810 2811 2812

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2813 2814 2815 2816
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2817

2818
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2819 2820

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2821
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2822
    taosArrayPush(pInfo->queryConds, &cond);
2823 2824
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2825
  for (int32_t i = 0; i < numOfTable; ++i) {
2826
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2827
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2828
    ps->param = param;
2829
    ps->onlyRef = true;
2830 2831 2832 2833 2834 2835
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2836
    T_LONG_JMP(pTaskInfo->env, terrno);
2837 2838
  }

2839 2840 2841 2842 2843 2844
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
2845
  SStorageAPI*         pAPI = &pTaskInfo->storageAPI;
2846

dengyihao's avatar
dengyihao 已提交
2847
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2848

2849 2850 2851 2852 2853 2854 2855
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2856
  for (int32_t i = 0; i < numOfTable; ++i) {
2857 2858
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2859
    pAPI->tsdReader.tsdReaderClose(param->dataReader);
D
dapan1121 已提交
2860
    param->dataReader = NULL;
2861
  }
2862 2863
  taosArrayClear(pInfo->sortSourceParams);

2864
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2865
  pInfo->pSortHandle = NULL;
2866

dengyihao's avatar
opt mem  
dengyihao 已提交
2867 2868 2869
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2870
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2871 2872 2873
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2874
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2875 2876 2877
  return TSDB_CODE_SUCCESS;
}

2878 2879
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2880 2881
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2882 2883 2884
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2885
  blockDataCleanup(pResBlock);
2886 2887

  while (1) {
2888
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2889 2890 2891 2892
    if (pTupleHandle == NULL) {
      break;
    }

2893 2894
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2895 2896 2897 2898
      break;
    }
  }

D
dapan1121 已提交
2899 2900 2901 2902 2903
  if (tsortIsClosed(pHandle)) {
    terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
    T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
  }

2904
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2905
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2906
         pInfo->limitInfo.numOfOutputRows);
2907

2908
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2921
    T_LONG_JMP(pTaskInfo->env, code);
2922
  }
2923

2924
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2925 2926
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2927

S
slzhou 已提交
2928
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2929
      setOperatorCompleted(pOperator);
2930 2931
      return NULL;
    }
S
slzhou 已提交
2932
    pInfo->tableStartIndex = 0;
2933
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2934 2935
    startGroupTableMergeScan(pOperator);
  }
2936

S
slzhou 已提交
2937 2938
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2939 2940 2941 2942
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2943 2944
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2945
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2946
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2947 2948 2949
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2950
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2951 2952
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2953
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2954 2955
        break;
      }
2956

S
slzhou 已提交
2957
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2958
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2959
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2960
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2961
    }
wmmhello's avatar
wmmhello 已提交
2962 2963
  }

2964 2965 2966
  return pBlock;
}

2967
void destroyTableMergeScanOperatorInfo(void* param) {
2968
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2969
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2970

dengyihao's avatar
dengyihao 已提交
2971 2972 2973
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2974 2975
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2976
    pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
D
dapan1121 已提交
2977
    p->dataReader = NULL;
2978
  }
H
Haojun Liao 已提交
2979

2980
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
2981 2982
  pTableScanInfo->base.dataReader = NULL;

2983
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2984 2985
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2986

dengyihao's avatar
opt mem  
dengyihao 已提交
2987 2988 2989
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2990 2991
  }

2992
  taosArrayDestroy(pTableScanInfo->queryConds);
2993
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
2994 2995 2996 2997 2998

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2999
  taosMemoryFreeClear(param);
3000 3001 3002 3003
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3004 3005
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3006
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
3007
  execInfo->blockRecorder = pInfo->base.readRecorder;
3008
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3009 3010 3011

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3012

3013 3014 3015
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3016
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3017
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3018 3019 3020 3021 3022
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3023

3024 3025 3026
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3027
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3028
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3029 3030 3031
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3032

H
Haojun Liao 已提交
3033
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3034
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3035
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3036 3037 3038 3039
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3040
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3041
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
3042
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
3043 3044 3045 3046
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3047 3048 3049 3050 3051 3052
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3053
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
3054 3055
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3056
  pInfo->base.readHandle = *readHandle;
3057 3058 3059

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3060
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3061

3062
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3063
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3064 3065 3066 3067 3068 3069

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3070
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3071
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3072 3073
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3074
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3075

H
Haojun Liao 已提交
3076
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3077
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3078
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3079

dengyihao's avatar
dengyihao 已提交
3080
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3081 3082
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3083

L
Liu Jicong 已提交
3084 3085
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3086
  pOperator->exprSupp.numOfExprs = numOfCols;
3087

3088 3089
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3090 3091 3092 3093 3094 3095 3096 3097 3098
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3099 3100 3101 3102

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3103
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3104
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3105
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
S
slzhou 已提交
3106
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3107
                                                   SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
S
slzhou 已提交
3108 3109
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3110 3111
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3112 3113 3114 3115 3116 3117 3118
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3180 3181
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
H
Haojun Liao 已提交
3194

S
slzhou 已提交
3195 3196 3197 3198 3199 3200 3201 3202 3203
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
H
Haojun Liao 已提交
3204

S
slzhou 已提交
3205 3206 3207 3208 3209 3210 3211
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3212

S
slzhou 已提交
3213
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3214 3215 3216
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3217
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3218
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3219
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3220 3221 3222 3223 3224 3225 3226 3227 3228

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3229
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3230 3231
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3232 3233 3234
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3235 3236 3237

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3238 3239
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3251 3252 3253
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3254
    ASSERT(strlen(dbName));
S
slzhou 已提交
3255
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3256 3257 3258 3259

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3260
    varDataSetLen(varDbName, strlen(dbName));
3261
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3262 3263 3264 3265
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3266
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3267
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3268
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3269
      varDataSetLen(varStbName, strlen(stbName));
3270
      colDataSetVal(colInfoData, 0, varStbName, false);
3271
    } else {
3272
      colDataSetNULL(colInfoData, 0);
3273
    }
S
slzhou 已提交
3274 3275 3276
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3277
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3278
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3279 3280 3281 3282
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3283
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3284 3285 3286
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3287
  size_t infodbTableNum;
S
slzhou 已提交
3288
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3289
  size_t perfdbTableNum;
S
slzhou 已提交
3290 3291
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3292
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3293
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3294 3295
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3296
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3297 3298 3299 3300
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3317 3318 3319 3320 3321 3322
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3323

S
slzhou 已提交
3324 3325 3326
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3327 3328 3329 3330 3331 3332 3333
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3334 3335 3336 3337 3338 3339 3340 3341
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3342
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3343 3344 3345 3346
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3347
  blockDataCleanup(pRes);
3348

S
slzhou 已提交
3349 3350 3351
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3352
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3353
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3354
  }
S
slzhou 已提交
3355

S
slzhou 已提交
3356 3357 3358 3359 3360
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3361 3362
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3363
  char        dbName[TSDB_DB_NAME_LEN] = {0};
3364 3365
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
S
slzhou 已提交
3366

S
slzhou 已提交
3367
  // get dbname
3368
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
S
slzhou 已提交
3369 3370 3371 3372
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3373
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3374 3375 3376 3377 3378 3379 3380 3381 3382
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
3383 3384 3385
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3386 3387 3388
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
H
Haojun Liao 已提交
3389
      if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList) < 0) {
S
slzhou 已提交
3390
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3391
      }
S
slzhou 已提交
3392 3393 3394
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
3395
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
S
slzhou 已提交
3396 3397 3398

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
3399
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName, pAPI);
S
slzhou 已提交
3400 3401

      pInfo->currGrpIdx++;
S
slzhou 已提交
3402
    } else {
S
slzhou 已提交
3403
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3404 3405
    }
  } else {
S
slzhou 已提交
3406 3407
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
3408 3409

    int64_t dbTableCount = 0;
3410
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &dbTableCount, NULL);
S
slzhou 已提交
3411 3412 3413 3414 3415 3416 3417
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
3418 3419 3420
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3421 3422
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
H
Haojun Liao 已提交
3423
      uint64_t uid = 0;
3424
      pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
3425 3426 3427 3428 3429

      int64_t numOfChildTables = 0;
      pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables);

      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
S
slzhou 已提交
3430
    } else {
H
Haojun Liao 已提交
3431 3432
      int64_t tbNumVnode = 0;
      pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3433
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3434
    }
S
slzhou 已提交
3435
  } else {
3436 3437
    int64_t tbNumVnode = 0;
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3438
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3439
  }
3440

S
slzhou 已提交
3441 3442 3443 3444
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3445
                                           SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI) {
S
slzhou 已提交
3446
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3447 3448 3449
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3450

S
slzhou 已提交
3451 3452
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
3453

3454
  int64_t numOfTables = 0;
3455
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
H
Haojun Liao 已提交
3456

3457 3458
  if (numOfTables != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes);
3459
  }
S
slzhou 已提交
3460 3461 3462
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3463
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI) {
S
slzhou 已提交
3464
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
3465
  pAPI->metaFn.getTableNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
S
slzhou 已提交
3466 3467

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3468
  if (pSupp->groupByDbName) {
H
Haojun Liao 已提交
3469
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, varDataVal(stbName));
D
dapan1121 已提交
3470
  } else {
H
Haojun Liao 已提交
3471
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", varDataVal(stbName));
D
dapan1121 已提交
3472
  }
X
Xiaoyu Wang 已提交
3473

S
slzhou 已提交
3474 3475 3476
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

H
Haojun Liao 已提交
3477 3478
  int64_t ctbNum = 0;
  int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
H
Haojun Liao 已提交
3479
  fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
S
shenglian zhou 已提交
3480 3481 3482
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3483
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3484 3485
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3486
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3487 3488
  taosMemoryFreeClear(param);
}
dengyihao's avatar
dengyihao 已提交
3489 3490

// clang-format on