scanoperator.c 136.9 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorInt.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
33 34
#include "operator.h"
#include "querytask.h"
H
Haojun Liao 已提交
35

36 37 38
#include "storageapi.h"
#include "wal.h"

D
dapan1121 已提交
39 40
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
41
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
42
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
43
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
L
fix bug  
liuyao 已提交
44
#define STREAM_SCAN_OP_NAME          "StreamScanOperator"
L
liuyao 已提交
45
#define STREAM_SCAN_OP_STATE_NAME    "StreamScanFillHistoryState"
46

H
Haojun Liao 已提交
47 48 49 50 51 52 53 54 55
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
56
  STsdbReader*   reader;
H
Haojun Liao 已提交
57 58
} STableMergeScanSortSourceParam;

59 60 61 62 63 64 65 66 67 68
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
69
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
70

H
Haojun Liao 已提交
71
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
72 73 74 75 76 77 78 79 80 81 82 83
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

84
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
85 86 87 88 89
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

90
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
91 92 93 94 95 96 97
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

98
  if (order == TSDB_ORDER_ASC) {
99
    w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
100
    ASSERT(w.ekey >= pBlockInfo->window.skey);
101

102
    if (w.ekey < pBlockInfo->window.ekey) {
103 104 105
      return true;
    }

106 107
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
108 109 110 111
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

112
      ASSERT(w.ekey > pBlockInfo->window.ekey);
113
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
114 115 116 117
        return true;
      }
    }
  } else {
118
    w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
119
    ASSERT(w.skey <= pBlockInfo->window.ekey);
120

121
    if (w.skey > pBlockInfo->window.skey) {
122 123 124
      return true;
    }

125
    while (1) {
126 127 128 129 130
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

H
Haojun Liao 已提交
131
      ASSERT(w.skey < pBlockInfo->window.skey);
132
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
133 134 135
        return true;
      }
    }
136 137 138 139 140
  }

  return false;
}

141 142 143 144 145 146 147 148 149 150 151
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
152 153
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
154 155 156 157 158

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
159
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
160 161 162
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
163

164 165 166
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

167 168 169 170 171 172 173 174
static int32_t insertTableToScanIgnoreList(STableScanInfo* pTableScanInfo, uint64_t uid) {
  if (NULL == pTableScanInfo->pIgnoreTables) {
    int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
    pTableScanInfo->pIgnoreTables = taosHashInit(tableNum,  taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (NULL == pTableScanInfo->pIgnoreTables) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
H
Haojun Liao 已提交
175

176 177 178 179 180
  taosHashPut(pTableScanInfo->pIgnoreTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes, sizeof(pTableScanInfo->scanTimes));

  return TSDB_CODE_SUCCESS;
}

181 182
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;
183
  int32_t code = TSDB_CODE_SUCCESS;
184

H
Haojun Liao 已提交
185
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
186 187 188
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
189
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
190 191

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
192
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
193 194 195 196 197 198 199 200 201

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
202
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
203 204 205 206 207 208 209 210 211

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
212
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
213 214 215

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
216
    code = insertTableToScanIgnoreList(pTableScanInfo, pBlockInfo->id.uid);
217 218
  }

219
  return code;
220 221
}

H
Haojun Liao 已提交
222
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
223
                               int32_t numOfRows) {
H
Haojun Liao 已提交
224
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
225 226 227
    return true;
  }

H
Haojun Liao 已提交
228
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
229 230 231
  return keep;
}

H
Haojun Liao 已提交
232
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
233 234
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

H
Haojun Liao 已提交
235
  bool    allColumnsHaveAgg = true;
G
Ganlin Zhao 已提交
236
  bool    hasNullSMA = false;
H
Haojun Liao 已提交
237
  int32_t code = pAPI->tsdReader.tsdReaderRetrieveBlockSMAInfo(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg, &hasNullSMA);
H
Haojun Liao 已提交
238
  if (code != TSDB_CODE_SUCCESS) {
239
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
240 241
  }

G
Ganlin Zhao 已提交
242
  if (!allColumnsHaveAgg || hasNullSMA) {
H
Haojun Liao 已提交
243 244 245 246 247
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
248
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
249
                               int32_t rows) {
H
Haojun Liao 已提交
250 251 252
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

253
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
254
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
255
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
256
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
257 258
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
259 260 261

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
262 263 264
  }
}

265
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
266
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
267
  const char* id = GET_TASKID(pTaskInfo);
268

269
  if (pLimitInfo->remainOffset > 0) {
270 271
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
272
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
273
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
274
      return false;
275
    } else {
276
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
277 278 279 280 281 282
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
283
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
284
    blockDataKeepFirstNRows(pBlock, keep);
285 286

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
287
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
288
    return true;
289
  }
290

291
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
292
  return false;
293 294
}

H
Haojun Liao 已提交
295
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
296
                             uint32_t* status) {
S
slzhou 已提交
297
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
298 299
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

300
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
301 302

  pCost->totalBlocks += 1;
303
  pCost->totalRows += pBlock->info.rows;
304

H
Haojun Liao 已提交
305
  bool loadSMA = false;
H
Haojun Liao 已提交
306
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
307
  if (pOperator->exprSupp.pFilterInfo != NULL ||
308
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
309 310 311 312
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
313
  taosMemoryFreeClear(pBlock->pBlockAgg);
314 315

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
316
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
317
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
318
    pCost->filterOutBlocks += 1;
319
    pCost->totalRows += pBlock->info.rows;
320
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
321 322
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
323 324 325
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
326
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
327
    pCost->skipBlocks += 1;
328
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
329
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
330
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
331
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
332
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
333
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
334
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
335
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
336
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
337
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
338
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
339 340
      return TSDB_CODE_SUCCESS;
    } else {
341
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
342
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
343
    }
H
Haojun Liao 已提交
344
  }
345

H
Haojun Liao 已提交
346
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
347

H
Haojun Liao 已提交
348
  // try to filter data block according to sma info
H
Haojun Liao 已提交
349
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
350 351 352
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
353
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
354
      if (!keep) {
X
Xiaoyu Wang 已提交
355 356
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
357 358 359
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

360
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
361 362
        return TSDB_CODE_SUCCESS;
      }
363
    }
H
Haojun Liao 已提交
364
  }
365

366 367 368
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

369
  // try to filter data block according to current results
370 371
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
372 373
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
374
    pCost->skipBlocks += 1;
375
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
376

377 378
    STableScanInfo* p1 = pOperator->info;
    if (taosHashGetSize(p1->pIgnoreTables) == taosArrayGetSize(p1->base.pTableListInfo->pTableList)) {
379 380 381 382
      *status = FUNC_DATA_REQUIRED_ALL_FILTEROUT;
    } else {
      *status = FUNC_DATA_REQUIRED_FILTEROUT;
    }
383 384 385
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
386 387
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
388

389
  SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
H
Haojun Liao 已提交
390
  if (p == NULL) {
H
Haojun Liao 已提交
391
    return terrno;
H
Haojun Liao 已提交
392 393
  }

H
Haojun Liao 已提交
394
  ASSERT(p == pBlock);
395
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
396

H
Haojun Liao 已提交
397 398
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
399

H
Haojun Liao 已提交
400
  if (pOperator->exprSupp.pFilterInfo != NULL) {
401 402
    int32_t code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
    if (code != TSDB_CODE_SUCCESS) return code;
403

404
    int64_t st = taosGetTimestampUs();
405 406
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
407

408 409
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
410
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
411 412 413 414
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
415 416
  }

417
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
418
  if (limitReached) {  // set operator flag is done
419 420
    setOperatorCompleted(pOperator);
  }
421

H
Haojun Liao 已提交
422
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
423 424 425
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
426
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
427 428 429
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
430
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
431 432
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
433 434
}

435 436
typedef struct STableCachedVal {
  const char* pName;
437
  STag*       pTags;
438 439
} STableCachedVal;

440 441 442 443 444 445 446 447 448 449 450
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
451 452
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
453
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
454 455 456 457
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
458
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
459 460 461 462 463 464 465
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

466
// const void *key, size_t keyLen, void *value
D
dapan1121 已提交
467 468 469 470 471 472
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value, void* ud) {
  (void)key;
  (void)keyLen;
  (void)ud;
  freeTableCachedVal(value);
}
473

474 475 476 477 478
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
479
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
480 481 482
  }
}

483 484
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
485
  // currently only the tbname pseudo column
486
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
487
    return TSDB_CODE_SUCCESS;
488 489
  }

490
  int32_t code = 0;
491
  bool    freeReader = false;
492

493 494 495 496
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

497
  STableCachedVal val = {0};
498 499

  SMetaReader mr = {0};
500
  LRUHandle*  h = NULL;
501

502 503 504
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

505
  // 1. check if it is existed in meta cache
506
  if (pCache == NULL) {
K
kailixu 已提交
507
    pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
508
    code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
509
    if (code != TSDB_CODE_SUCCESS) {
510
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
511
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
512 513
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
514 515 516

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
517
      } else {
S
slzhou 已提交
518 519
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
520
      }
521
      pHandle->api.metaReaderFn.clearReader(&mr);
522 523 524
      return terrno;
    }

525
    pHandle->api.metaReaderFn.readerReleaseLock(&mr);
526

527 528
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
529 530

    freeReader = true;
531
  } else {
532 533
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
534
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
535
    if (h == NULL) {
H
Haojun Liao 已提交
536
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0, &pHandle->api.metaFn);
537
      code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
538
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
539
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
540
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
541
                pBlock->info.id.uid, tstrerror(terrno), idStr);
542 543
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
544
        } else {
H
Haojun Liao 已提交
545
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
546
                 idStr);
H
Haojun Liao 已提交
547
        }
548
        pHandle->api.metaReaderFn.clearReader(&mr);
549 550 551
        return terrno;
      }

552
      pHandle->api.metaReaderFn.readerReleaseLock(&mr);
553

H
Haojun Liao 已提交
554
      STableCachedVal* pVal = createTableCacheVal(&mr);
555

H
Haojun Liao 已提交
556
      val = *pVal;
557
      freeReader = true;
H
Haojun Liao 已提交
558

H
Haojun Liao 已提交
559
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
D
dapan1121 已提交
560
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
561 562 563 564 565 566 567 568
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
569

H
Haojun Liao 已提交
570
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
571
    }
H
Haojun Liao 已提交
572

573 574
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
575
  }
576

577 578
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
579
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
580 581

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
582
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
583

584
    int32_t functionId = pExpr1->pExpr->_function.functionId;
585 586 587

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
588
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
589
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
590
      STagVal tagVal = {0};
591
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
592
      const char* p = pHandle->api.metaFn.extractTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
593

594 595 596 597
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
598
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
599
      }
600

H
Haojun Liao 已提交
601 602
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
603
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
604
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
605
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
606 607 608
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
609 610
        if (code) {
          if (freeReader) {
611
            pHandle->api.metaReaderFn.clearReader(&mr);
D
dapan1121 已提交
612 613 614
          }
          return code;
        }
L
Liu Jicong 已提交
615
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
616
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
617
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
618
        }
619 620 621 622
      }
    }
  }

623 624
  // restore the rows
  pBlock->info.rows = backupRows;
625
  if (freeReader) {
626
    pHandle->api.metaReaderFn.clearReader(&mr);
627 628
  }

H
Haojun Liao 已提交
629
  return TSDB_CODE_SUCCESS;
630 631
}

H
Haojun Liao 已提交
632
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
633 634 635
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
636
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
637
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
638 639 640
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
641

H
Haojun Liao 已提交
642
  colInfoDataEnsureCapacity(&infoData, 1, false);
643
  colDataSetVal(&infoData, 0, buf, false);
644

H
Haojun Liao 已提交
645
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
646
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
647 648 649 650 651 652 653

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
654
  colDataDestroy(&infoData);
655 656
}

657
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
658
  STableScanInfo* pTableScanInfo = pOperator->info;
659
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
660 661
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;

L
Liu Jicong 已提交
662
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
663 664
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
665

666 667
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
668
  while (true) {
669
    code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
D
dapan1121 已提交
670
    if (code) {
671
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
672 673 674 675 676 677
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
678

679
    if (isTaskKilled(pTaskInfo)) {
680
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
681
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
682
    }
H
Haojun Liao 已提交
683

684
    if (pOperator->status == OP_EXEC_DONE) {
685
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
686 687 688
      break;
    }

689 690 691 692 693 694
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
695
    if (pBlock->info.id.uid) {
696
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
697
    }
698

699
    uint32_t status = 0;
700
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
701
    if (code != TSDB_CODE_SUCCESS) {
702
      T_LONG_JMP(pTaskInfo->env, code);
703
    }
704

705 706 707 708
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

709 710 711
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
712
    }
713

H
Haojun Liao 已提交
714 715
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
716

H
Haojun Liao 已提交
717
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
718
    pBlock->info.scanFlag = pTableScanInfo->base.scanFlag;
719
    return pBlock;
H
Haojun Liao 已提交
720 721 722 723
  }
  return NULL;
}

H
Haojun Liao 已提交
724
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
725 726
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
727
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
H
Haojun Liao 已提交
728 729

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
730
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
731 732 733
    return NULL;
  }

734 735
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
736 737 738
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
739 740
    }

741
    pTableScanInfo->scanTimes += 1;
742
    taosHashClear(pTableScanInfo->pIgnoreTables);
743

744
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
745
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
746 747
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
748
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
749

750
      // do prepare for the next round table scan operation
751
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
752
    }
753
  }
H
Haojun Liao 已提交
754

755
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
756
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
757 758
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
759
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
760
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
761
    }
H
Haojun Liao 已提交
762

763
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
764 765 766
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
767
      }
H
Haojun Liao 已提交
768

769
      pTableScanInfo->scanTimes += 1;
770
      taosHashClear(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
771

772
      if (pTableScanInfo->scanTimes < total) {
773
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
774
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
775

776
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
777
        pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
778
      }
H
Haojun Liao 已提交
779 780 781
    }
  }

wmmhello's avatar
wmmhello 已提交
782 783 784 785 786 787
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
788
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
789

790
  // scan table one by one sequentially
L
Liu Jicong 已提交
791
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
792
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
793
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
794

L
Liu Jicong 已提交
795
    while (1) {
H
Haojun Liao 已提交
796
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
797
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
798 799
        return result;
      }
H
Haojun Liao 已提交
800

L
Liu Jicong 已提交
801 802
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
803 804

      taosRLockLatch(&pTaskInfo->lock);
805
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
806

H
Haojun Liao 已提交
807
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
808
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
809
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
810 811
        return NULL;
      }
H
Haojun Liao 已提交
812

X
Xiaoyu Wang 已提交
813
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
814 815
      taosRUnLockLatch(&pTaskInfo->lock);

816
      pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
817
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
818
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
819

820
      pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
821 822
      pInfo->scanTimes = 0;
    }
823 824
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
825
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
826
        setOperatorCompleted(pOperator);
827 828
        return NULL;
      }
829

5
54liuyao 已提交
830
      int32_t        num = 0;
831
      STableKeyInfo* pList = NULL;
832
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
833
      ASSERT(pInfo->base.dataReader == NULL);
834

835
      int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
836
                                    (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
837 838 839
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
840 841 842 843

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
844
    }
H
Haojun Liao 已提交
845

H
Haojun Liao 已提交
846
    SSDataBlock* result = doGroupedTableScan(pOperator);
847 848 849
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
850

851 852 853 854 855
    while (1) {
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
        setOperatorCompleted(pOperator);
        return NULL;
      }
wmmhello's avatar
wmmhello 已提交
856

857 858 859
      // reset value for the next group data output
      pOperator->status = OP_OPENED;
      resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
860

861 862 863
      int32_t        num = 0;
      STableKeyInfo* pList = NULL;
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
864

865 866 867
      pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
      pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
      pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
868

869 870 871 872
      result = doGroupedTableScan(pOperator);
      if (result != NULL) {
        return result;
      }
873 874
    }
  }
H
Haojun Liao 已提交
875 876
}

877 878
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
879
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
880
  *pRecorder = pTableScanInfo->base.readRecorder;
881 882 883 884 885
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

886
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
887
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
888

889
  pAPI->tsdReaderClose(pBase->dataReader);
890
  pBase->dataReader = NULL;
891

892 893
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
894
  }
L
Liu Jicong 已提交
895

896
  tableListDestroy(pBase->pTableListInfo);
897 898 899 900 901 902 903
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
904
  taosHashCleanup(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
905
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
D
dapan1121 已提交
906
  taosMemoryFreeClear(param);
907 908
}

909
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
910
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
911
  int32_t         code = 0;
H
Haojun Liao 已提交
912 913 914
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
915
    code = TSDB_CODE_OUT_OF_MEMORY;
916
    goto _error;
H
Haojun Liao 已提交
917 918
  }

919
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
920
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
921 922

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
923
  code =
H
Haojun Liao 已提交
924
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
925 926 927 928
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
929
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
930
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
931
  if (code != TSDB_CODE_SUCCESS) {
932
    goto _error;
933 934
  }

H
Haojun Liao 已提交
935
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
936
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
937
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
938
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
939 940
  }

941
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
942
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
943

H
Haojun Liao 已提交
944 945
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
946 947
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

948 949
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
950

H
Haojun Liao 已提交
951
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
952
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
953
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
954
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
955

H
Haojun Liao 已提交
956 957 958
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
959 960
  }

wmmhello's avatar
wmmhello 已提交
961
  pInfo->currentGroupId = -1;
962
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
963
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
964

L
Liu Jicong 已提交
965 966
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
967
  pOperator->exprSupp.numOfExprs = numOfCols;
968

969
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
970 971
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
972 973 974
    code = terrno;
    goto _error;
  }
975

D
dapan1121 已提交
976 977 978 979
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
980
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
981 982
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
983 984 985

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
986
  return pOperator;
987

988
_error:
989 990 991
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
992

993 994
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
995
  return NULL;
H
Haojun Liao 已提交
996 997
}

998
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
999
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
1000
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1001

H
Haojun Liao 已提交
1002
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
1003
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
1004

L
Liu Jicong 已提交
1005 1006
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1007
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1008 1009 1010
  return pOperator;
}

1011
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1012
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1013 1014
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1015 1016
}

1017
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1018
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1019 1020
}

1021
static bool isStateWindow(SStreamScanInfo* pInfo) {
1022
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1023
}
5
54liuyao 已提交
1024

L
Liu Jicong 已提交
1025
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1026 1027 1028
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1029 1030 1031
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1032
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1033 1034
}

1035 1036 1037 1038
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1039
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1040 1041
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1042
  ASSERT(rowIndex < pBlock->info.rows);
1043
  pInfo->groupId = groupCol[rowIndex];
1044 1045
}

1046
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) {
H
Haojun Liao 已提交
1047
  pTableScanInfo->base.cond.twindows = *pWin;
L
liuyao 已提交
1048
  pTableScanInfo->base.cond.startVersion = 0;
1049
  pTableScanInfo->base.cond.endVersion = ver;
L
Liu Jicong 已提交
1050 1051
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1052
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1053
  pTableScanInfo->base.dataReader = NULL;
1054 1055
}

L
Liu Jicong 已提交
1056 1057
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1058
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1059

1060
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1061
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1062 1063 1064 1065 1066 1067

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
1068
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
1069 1070 1071

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
1072
  int32_t      code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
1073
                                     (void**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
1074 1075
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1076
    T_LONG_JMP(pTaskInfo->env, code);
1077 1078 1079
    return NULL;
  }

D
dapan1121 已提交
1080
  bool hasNext = false;
1081
  code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext);
1082 1083
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1084
    T_LONG_JMP(pTaskInfo->env, code);
1085 1086 1087
    return NULL;
  }

D
dapan1121 已提交
1088
  if (hasNext) {
1089
    /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1090
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1091
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1092 1093
  }

1094
  pAPI->tsdReader.tsdReaderClose(pReader);
D
dapan1121 已提交
1095
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1096 1097
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1098 1099

  return pBlock->info.rows > 0 ? pBlock : NULL;
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1111
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1112
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1113
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1114 1115
}

5
54liuyao 已提交
1116 1117 1118 1119 1120 1121 1122 1123
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1124
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1125 1126 1127
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1138 1139 1140
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1141 1142 1143 1144 1145 1146

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1147
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1148 1149 1150 1151
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1152 1153 1154
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1155
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1156 1157 1158
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1159

1160
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1161 1162 1163
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1164

1165 1166
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1167 1168 1169
    break;
  }

1170
  STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
L
liuyao 已提交
1171
  qDebug("prepare range scan start:%" PRId64 ",end:%" PRId64 ",maxVer:%" PRIu64, win.skey, win.ekey, pInfo->pUpdateInfo->maxDataVersion);
1172
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1173
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1174 1175 1176
  return true;
}

5
54liuyao 已提交
1177
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1178
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1179
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1180
  dumyInfo.cur.pageId = -1;
1181
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1182 1183
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1184
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1185

5
54liuyao 已提交
1186
  while (1) {
1187 1188 1189
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1190
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1191 1192 1193 1194 1195
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1196
    }
5
54liuyao 已提交
1197

5
54liuyao 已提交
1198 1199 1200
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1201
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1202
    endWin = preWin;
5
54liuyao 已提交
1203
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1204 1205 1206 1207 1208 1209
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1210

L
Liu Jicong 已提交
1211
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1212
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1213
  bool prepareRes = true;
L
Liu Jicong 已提交
1214 1215 1216
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1217 1218
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1219 1220 1221 1222
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1223 1224 1225
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1226 1227
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1228
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1229
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1230
      pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1231
      pTableScanInfo->base.dataReader = NULL;
1232 1233
      return NULL;
    }
L
Liu Jicong 已提交
1234

H
Haojun Liao 已提交
1235
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1236 1237 1238 1239
    if (pResult->info.rows == 0) {
      continue;
    }

1240 1241 1242 1243 1244 1245 1246 1247
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1248 1249
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1250
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1251 1252 1253 1254
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1255 1256 1257

      blockDataDestroy(tmpBlock);

1258 1259 1260 1261
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1262
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1263
      pResult->info.calWin = pInfo->updateWin;
1264
      return pResult;
5
54liuyao 已提交
1265 1266
    }
  }
1267
}
1268

1269
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1270
                                   SSessionKey* pKey) {
1271 1272 1273
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1274

1275 1276
  void* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
1277 1278 1279
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1280 1281

  taosMemoryFree(pCur);
1282 1283 1284
  return code;
}

1285
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1286
  blockDataCleanup(pDestBlock);
1287 1288
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1289
  }
1290
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1291
  if (code != TSDB_CODE_SUCCESS) {
1292
    return code;
L
Liu Jicong 已提交
1293
  }
1294 1295
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1296
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1297
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1298
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1299 1300
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1301

1302 1303
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1304
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1305
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1306 1307
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
H
Haojun Liao 已提交
1308
  int64_t          ver = pSrcBlock->info.version - 1;
1309
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
H
Haojun Liao 已提交
1310
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver);
L
Liu Jicong 已提交
1311
    // gap must be 0.
5
54liuyao 已提交
1312
    SSessionKey startWin = {0};
1313
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1314
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1315 1316 1317
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1318 1319
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1320
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1321 1322 1323 1324
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1325
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1326 1327
      continue;
    }
1328 1329
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1330

1331
    colDataSetNULL(pDestUidCol, i);
1332
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1333 1334
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1335
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1336
  }
1337
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1338
}
1339 1340 1341 1342 1343 1344

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1345
  }
1346

1347 1348
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1349 1350
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1351

L
Liu Jicong 已提交
1352
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1353
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1354 1355
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
H
Haojun Liao 已提交
1356
  int64_t ver = pSrcBlock->info.version - 1;
5
54liuyao 已提交
1357 1358 1359 1360 1361

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
H
Haojun Liao 已提交
1362
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
5
54liuyao 已提交
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1390 1391
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1392
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1393 1394 1395
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1396
  for (int32_t i = 0; i < rows;) {
1397
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1398 1399
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
H
Haojun Liao 已提交
1400
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
5
54liuyao 已提交
1401 1402
    }
    TSKEY calStartTs = srcStartTsCol[i];
1403
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1404
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1405 1406
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1407 1408 1409 1410 1411
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1412
    pDestBlock->info.rows++;
5
54liuyao 已提交
1413
  }
1414 1415
  return TSDB_CODE_SUCCESS;
}
1416

1417
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1418 1419 1420
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1421 1422
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1423
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1424 1425 1426 1427
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1428 1429 1430 1431 1432 1433 1434 1435 1436
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
H
Haojun Liao 已提交
1437
  int64_t ver = pSrcBlock->info.version - 1;
1438
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1439 1440
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1441
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1442
    if (groupId == 0) {
H
Haojun Liao 已提交
1443
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver);
5
54liuyao 已提交
1444
    }
L
Liu Jicong 已提交
1445
    if (pInfo->tbnameCalSup.pExprInfo) {
1446
      void* parTbname = NULL;
1447
      pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
1448

L
Liu Jicong 已提交
1449 1450
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
1451
      pInfo->stateStore.streamStateFreeVal(parTbname);
L
Liu Jicong 已提交
1452 1453 1454
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1455 1456 1457 1458
  }
  return TSDB_CODE_SUCCESS;
}

1459 1460 1461 1462
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1463
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1464
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1465 1466
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1467
  }
1468
  pDestBlock->info.type = STREAM_CLEAR;
1469
  pDestBlock->info.version = pSrcBlock->info.version;
1470
  pDestBlock->info.dataLoad = 1;
1471 1472 1473 1474
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1475
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1476
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1477 1478
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1479
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1480
  } else {
5
54liuyao 已提交
1481
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
1482
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
L
Liu Jicong 已提交
1483
  }
L
Liu Jicong 已提交
1484 1485
}

1486 1487
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1488 1489
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1490 1491
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1492 1493
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1494
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1495 1496 1497 1498 1499 1500 1501
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1502
  pBlock->info.rows++;
5
54liuyao 已提交
1503 1504
}

1505
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1506 1507
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1508
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1509
  }
1510 1511
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1512
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
1513
  bool   tableInserted = pInfo->stateStore.updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1514
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1515 1516
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1517
    bool        isClosed = false;
5
54liuyao 已提交
1518
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1519
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1520 1521 1522 1523 1524
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1525 1526 1527
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1528
    // must check update info first.
1529
    bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1530
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1531
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore);
L
Liu Jicong 已提交
1532
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1533
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1534
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1535
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1536
                                       NULL);
5
54liuyao 已提交
1537 1538
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1539 1540
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1541
      }
1542 1543
    }
  }
1544 1545
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1546
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1547
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1548
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1549 1550
  }
}
L
Liu Jicong 已提交
1551

1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
  if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
    bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
    bool  hasUnqualified = false;

    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);

    if (pWindow->skey != INT64_MIN) {
      qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);

      ASSERT(pCol->pData != NULL);
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
        int64_t* ts = (int64_t*)colDataGetData(pCol, i);
        p[i] = (*ts >= pWindow->skey);

        if (!p[i]) {
          hasUnqualified = true;
        }
      }
    } else if (pWindow->ekey != INT64_MAX) {
      qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
        int64_t* ts = (int64_t*)colDataGetData(pCol, i);
        p[i] = (*ts <= pWindow->ekey);

        if (!p[i]) {
          hasUnqualified = true;
        }
      }
    }

    if (hasUnqualified) {
      trimDataBlock(pBlock, pBlock->info.rows, p);
    }

    taosMemoryFree(p);
  }
}

// re-build the delete block, ONLY according to the split timestamp
1592
static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
1593
  int32_t numOfRows = pBlock->info.rows;
1594 1595 1596 1597
  bool*   p = taosMemoryCalloc(numOfRows, sizeof(bool));
  bool    hasUnqualified = false;
  int64_t skey = pWindow->skey;
  int64_t ekey = pWindow->ekey;
1598 1599 1600 1601 1602 1603

  SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        tsStartCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        tsEndCol = (uint64_t*)pSrcEndCol->pData;

1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614
  if (pWindow->skey != INT64_MIN) {
    for (int32_t i = 0; i < numOfRows; i++) {
      if (tsStartCol[i] < skey) {
        tsStartCol[i] = skey;
      }

      if (tsEndCol[i] >= skey) {
        p[i] = true;
      } else {  // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
        hasUnqualified = true;
      }
1615
    }
1616 1617 1618 1619 1620
  } else if (pWindow->ekey != INT64_MAX) {
    for(int32_t i = 0; i < numOfRows; ++i) {
      if (tsEndCol[i] > ekey) {
        tsEndCol[i] = ekey;
      }
1621

1622 1623 1624 1625 1626
      if (tsStartCol[i] <= ekey) {
        p[i] = true;
      } else {
        hasUnqualified = true;
      }
1627 1628 1629 1630 1631
    }
  }

  if (hasUnqualified) {
    trimDataBlock(pBlock, pBlock->info.rows, p);
1632 1633 1634
    qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
  } else {
    qDebug("%s not update the delete block", id);
1635 1636 1637 1638 1639 1640
  }

  taosMemoryFree(p);
}

static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
L
Liu Jicong 已提交
1641 1642
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1643
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
1644
  const char*     id = GET_TASKID(pTaskInfo);
L
Liu Jicong 已提交
1645

1646 1647
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

1648 1649 1650 1651
  pBlockInfo->rows = pBlock->info.rows;
  pBlockInfo->id.uid = pBlock->info.id.uid;
  pBlockInfo->type = STREAM_NORMAL;
  pBlockInfo->version = pBlock->info.version;
L
Liu Jicong 已提交
1652

1653
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1654
  pBlockInfo->id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1655 1656

  // todo extract method
H
Haojun Liao 已提交
1657 1658 1659
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1660 1661 1662 1663 1664 1665 1666
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1667
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1668
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1669 1670 1671 1672 1673 1674 1675
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1676
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1677
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1678 1679 1680 1681 1682
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1683
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1684
                                          pBlockInfo->rows, id, &pTableScanInfo->base.metaCache);
K
kailixu 已提交
1685 1686
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1687
      blockDataFreeRes((SSDataBlock*)pBlock);
1688
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1689
    }
K
kailixu 已提交
1690 1691 1692

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1693 1694
  }

1695
  if (filter) {
H
Haojun Liao 已提交
1696
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1697
  }
1698

1699 1700
  // filter the block extracted from WAL files, according to the time window apply additional time window filter
  doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
1701
  pInfo->pRes->info.dataLoad = 1;
1702

L
Liu Jicong 已提交
1703
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
1704 1705 1706
  if (pInfo->pRes->info.rows == 0) {
    return 0;
  }
L
Liu Jicong 已提交
1707

L
Liu Jicong 已提交
1708
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1709 1710
  return 0;
}
5
54liuyao 已提交
1711

L
Liu Jicong 已提交
1712
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1713 1714 1715
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

1716
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1717
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1718

1719
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1720

1721
  if (isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
1722
    return NULL;
L
Liu Jicong 已提交
1723 1724
  }

1725
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1726 1727
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
1728 1729 1730
//      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
//             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
//             pInfo->tqReader->pWalReader->curVersion);
1731
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1732
      return pResult;
1733
    }
1734

1735
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1736
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
H
Haojun Liao 已提交
1737

1738
    pTSInfo->base.dataReader = NULL;
1739 1740 1741
    int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer);
    if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) {
1742
      return NULL;
1743
    }
1744

1745
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer);
1746 1747
  }

1748
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
1749

L
Liu Jicong 已提交
1750
    while (1) {
1751 1752
      bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);

H
Haojun Liao 已提交
1753
      SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
1754
      struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
1755

1756 1757
      // curVersion move to next
      tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
1758

1759
      if (hasResult) {
1760
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
X
Xiaoyu Wang 已提交
1761
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1762
        blockDataCleanup(pInfo->pRes);
1763 1764
        STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
        setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
L
Liu Jicong 已提交
1765 1766 1767
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
1768
      } else {
wmmhello's avatar
wmmhello 已提交
1769
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1770 1771 1772
        return NULL;
      }
    }
L
Liu Jicong 已提交
1773
  } else {
1774
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1775
    return NULL;
H
Haojun Liao 已提交
1776
  }
L
Liu Jicong 已提交
1777 1778
}

L
Liu Jicong 已提交
1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
1794 1795

  int32_t j = 0;
L
Liu Jicong 已提交
1796
  for (int32_t i = 0; i < rows; i++) {
1797
    if (pInfo->readerFn.tqReaderIsQueriedTable(pReader, uidCol[i])) {
1798 1799 1800
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1801

1802 1803 1804
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1805 1806 1807
      j++;
    }
  }
1808

L
Liu Jicong 已提交
1809
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1810 1811
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1812
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1813 1814 1815 1816

  return 0;
}

5
54liuyao 已提交
1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1829
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1830 1831 1832 1833
    }
  }
}

5
54liuyao 已提交
1834
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
L
liuyao 已提交
1835
  if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
1836
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1837
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1849 1850 1851 1852
    }
  }
}

1853 1854 1855 1856 1857 1858
//int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
//  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
//  *pBuff = taosMemoryCalloc(1, len);
//  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
//  return len;
//}
L
liuyao 已提交
1859 1860

// other properties are recovered from the execution plan
1861
void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
L
fix bug  
liuyao 已提交
1862
  if (!pBuff || len == 0) {
L
liuyao 已提交
1863 1864 1865
    return;
  }

L
liuyao 已提交
1866
  void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
1867
  int32_t      code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1868 1869 1870 1871 1872
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1873 1874
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
1875 1876
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  const char*    id = GET_TASKID(pTaskInfo);
1877

1878
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
L
Liu Jicong 已提交
1879
  SStreamScanInfo* pInfo = pOperator->info;
1880
  SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
L
Liu Jicong 已提交
1881

1882
  qDebug("stream scan started, %s", id);
H
Haojun Liao 已提交
1883

1884
  if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1 || pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1885
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1886
    memcpy(&pTSInfo->base.cond, &pStreamInfo->tableCond, sizeof(SQueryTableDataCond));
1887

1888 1889 1890
    if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
      pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
      pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
1891

1892
      pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
1893 1894
      qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id);
1895
      pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1;
L
liuyao 已提交
1896
      pStreamInfo->recoverScanFinished = false;
1897
    } else {
1898 1899
      pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
      pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
1900 1901 1902 1903
      pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
      qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
             pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
             pTSInfo->base.cond.twindows.ekey, id);
L
liuyao 已提交
1904
      pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
1905
    }
L
Liu Jicong 已提交
1906

1907
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1908

H
Haojun Liao 已提交
1909
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1910
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1911

L
Liu Jicong 已提交
1912 1913 1914 1915
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
  }

L
liuyao 已提交
1916
  if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
L
liuyao 已提交
1917 1918 1919
    if (isTaskKilled(pTaskInfo)) {
      return NULL;
    }
5
54liuyao 已提交
1920 1921 1922 1923 1924 1925 1926

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
L
liuyao 已提交
1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955
      // case STREAM_SCAN_FROM_UPDATERES: {
      //   generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
      //   prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      //   pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      //   printDataBlock(pInfo->pUpdateRes, "recover update");
      //   return pInfo->pUpdateRes;
      // } break;
      // case STREAM_SCAN_FROM_DELETE_DATA: {
      //   generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
      //   prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
      //   pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      //   copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
      //   pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
      //   printDataBlock(pInfo->pDeleteDataRes, "recover delete");
      //   return pInfo->pDeleteDataRes;
      // } break;
      // case STREAM_SCAN_FROM_DATAREADER_RANGE: {
      //   SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
      //   if (pSDB) {
      //     STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
      //     pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
      //     checkUpdateData(pInfo, true, pSDB, false);
      //     printDataBlock(pSDB, "scan recover update");
      //     calBlockTbName(pInfo, pSDB);
      //     return pSDB;
      //   }
      //   blockDataCleanup(pInfo->pUpdateDataRes);
      //   pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      // } break;
5
54liuyao 已提交
1956 1957 1958 1959 1960 1961 1962
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
      calBlockTbName(pInfo, pInfo->pRecoverRes);
L
liuyao 已提交
1963
      if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
L
liuyao 已提交
1964 1965 1966 1967 1968 1969 1970
        // if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
        TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
        pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        // } else {
        //   pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
        //   doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        // }
1971
      }
5
54liuyao 已提交
1972 1973
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1974
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1975 1976
        return pInfo->pCreateTbRes;
      }
1977

X
Xiaoyu Wang 已提交
1978
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1979 1980
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1981
    }
1982
    pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1983
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1984
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1985

H
Haojun Liao 已提交
1986
    pTSInfo->base.dataReader = NULL;
1987

H
Haojun Liao 已提交
1988 1989
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1990

1991
    pStreamInfo->recoverScanFinished = true;
L
Liu Jicong 已提交
1992 1993 1994
    return NULL;
  }

5
54liuyao 已提交
1995
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1996
// TODO: refactor
L
Liu Jicong 已提交
1997
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1998
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1999
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
2000
      doClearBufferedBlocks(pInfo);
H
Haojun Liao 已提交
2001 2002 2003
      return NULL;
    }

2004 2005 2006
    int32_t  current = pInfo->validBlockIndex++;
    qDebug("process %d/%d input data blocks, %s", current, (int32_t) total, id);

L
Liu Jicong 已提交
2007 2008
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
2009
    if (pBlock->info.parTbName[0]) {
2010
      pAPI->stateStore.streamStatePutParName(pStreamInfo->pState, pBlock->info.id.groupId, pBlock->info.parTbName);
2011
    }
2012

2013
    // TODO move into scan
5
54liuyao 已提交
2014 2015
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
2016
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
2017
    if (pInfo->pUpdateInfo) {
2018
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
L
fix bug  
liuyao 已提交
2019
    }
2020

2021
    blockDataUpdateTsWindow(pBlock, 0);
2022
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
2023 2024 2025
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
2026 2027 2028
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
2029
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
2030
        pInfo->updateResIndex = 0;
2031
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2032
        pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
2033 2034
      } break;
      case STREAM_DELETE_DATA: {
2035
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
2036
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
2037
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
2038
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
2039
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
2040 2041
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
2042
        }
2043

5
54liuyao 已提交
2044
        setBlockGroupIdByUid(pInfo, pDelBlock);
2045
        rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
5
54liuyao 已提交
2046
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
2047 2048 2049 2050 2051 2052
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
2053

2054
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
2055
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
2056
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
2057
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
2058 2059
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
2060 2061 2062 2063 2064
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
2065 2066 2067
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
2068
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
2069 2070 2071
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
2072 2073 2074 2075
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
2076
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
2077
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
2078 2079 2080 2081
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
2082
        }
2083 2084 2085
      } break;
      default:
        break;
5
54liuyao 已提交
2086
    }
2087
    // printDataBlock(pBlock, "stream scan recv");
2088
    return pBlock;
L
Liu Jicong 已提交
2089
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
2090
    qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
5
54liuyao 已提交
2091 2092 2093
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
2094
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
2095 2096 2097
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2098 2099 2100
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2101
      } break;
2102
      case STREAM_SCAN_FROM_DELETE_DATA: {
2103 2104 2105 2106 2107 2108 2109
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2110 2111 2112 2113 2114 2115 2116 2117 2118 2119
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2120
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2121 2122
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2123
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2124
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2125 2126
          return pSDB;
        }
2127
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2128 2129 2130 2131
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2132
    }
2133

2134
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2135
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2136 2137
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2138 2139
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2140
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2141
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2142
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2143
    }
5
54liuyao 已提交
2144

2145
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
H
Haojun Liao 已提交
2146
    int32_t         totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
2147

L
Liu Jicong 已提交
2148
  NEXT_SUBMIT_BLK:
2149
    while (1) {
2150
      if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
2151
        if (pInfo->validBlockIndex >= totalBlocks) {
2152
          pAPI->stateStore.updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
2153 2154 2155 2156 2157 2158 2159 2160 2161 2162
          doClearBufferedBlocks(pInfo);

          qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
          return NULL;
        }

        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);

        qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
2163
        if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2164 2165 2166 2167
          qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
          continue;
        }
      }
2168

2169
      blockDataCleanup(pInfo->pRes);
2170

2171 2172
      while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
        SSDataBlock* pRes = NULL;
2173

2174
        int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
2175
        qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
H
Haojun Liao 已提交
2176

2177
        if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
2178
          qDebug("retrieve data failed, try next block in submit block, %s", id);
2179 2180 2181
          continue;
        }

2182 2183
        setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
        if (pInfo->pRes->info.rows == 0) {
2184 2185
          continue;
        }
2186

5
54liuyao 已提交
2187 2188
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
2189
          qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
5
54liuyao 已提交
2190
          return pInfo->pCreateTbRes;
2191 2192
        }

2193 2194
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2195

2196 2197 2198
        int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
        qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
        if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {
2199 2200
          break;
        }
2201
      }
H
Haojun Liao 已提交
2202

2203
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2204
        break;
2205 2206
      } else {
        continue;
5
54liuyao 已提交
2207
      }
H
Haojun Liao 已提交
2208 2209 2210 2211
    }

    // record the scan action.
    pInfo->numOfExec++;
2212
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
2213

2214
    qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
L
Liu Jicong 已提交
2215
    if (pBlockInfo->rows > 0) {
2216
      return pInfo->pRes;
L
Liu Jicong 已提交
2217
    }
2218 2219 2220 2221 2222 2223

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
H
Haojun Liao 已提交
2224
  }
2225

2226
  return NULL;
H
Haojun Liao 已提交
2227 2228
}

H
Haojun Liao 已提交
2229
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2230 2231 2232
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2233 2234 2235
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2236 2237 2238 2239 2240 2241
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2242
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2243
  // NOTE: this operator does never check if current status is done or not
2244 2245 2246
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

2247
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2248
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2249
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2250
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2251

wmmhello's avatar
wmmhello 已提交
2252
  qDebug("tmqsnap doRawScan called");
2253
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2254
    bool hasNext = false;
2255
    if (pInfo->dataReader && pInfo->sContext->withMeta != ONLY_META) {
2256
      code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
D
dapan1121 已提交
2257
      if (code) {
2258
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2259
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2260 2261
      }
    }
X
Xiaoyu Wang 已提交
2262

D
dapan1121 已提交
2263
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2264
      if (isTaskKilled(pTaskInfo)) {
2265
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2266
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2267
      }
2268

2269
      SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL);
H
Haojun Liao 已提交
2270
      if (pBlock == NULL) {
2271
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2272 2273
      }

H
Haojun Liao 已提交
2274
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2275
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2276 2277
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2278

H
Haojun Liao 已提交
2279
    SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2280
    STqOffsetVal   offset = {0};
2281
    if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2282
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2283
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion + 1);
L
Liu Jicong 已提交
2284
    } else {
2285
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2286
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2287
    }
2288
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2289
    tDeleteSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2290
    return NULL;
2291
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2292 2293 2294 2295 2296
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
H
Haojun Liao 已提交
2297 2298
    if (pAPI->snapshotFn.getTableInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
      qError("tmqsnap getTableInfoFromSnapshot error");
wmmhello's avatar
wmmhello 已提交
2299
      taosMemoryFreeClear(data);
2300 2301 2302
      return NULL;
    }

2303
    if (!sContext->queryMeta) {  // change to get data next poll request
2304 2305 2306
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2307
    } else {
2308
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2309 2310 2311 2312
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2313

wmmhello's avatar
wmmhello 已提交
2314
    return NULL;
2315 2316 2317 2318
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2319
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2320
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
2321
  pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader);
2322
  pRawScan->pAPI->snapshotFn.destroySnapshot(pRawScan->sContext);
2323
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2324 2325 2326
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2327 2328 2329
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2330
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2331 2332 2333 2334 2335
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2336 2337
  int32_t code = TSDB_CODE_SUCCESS;

2338
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2339
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2340
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2341 2342
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2343 2344
  }

2345
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2346
  pInfo->vnode = pHandle->vnode;
H
Haojun Liao 已提交
2347
  pInfo->pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
2348

2349
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2350 2351
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2352

2353
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2354
  return pOperator;
H
Haojun Liao 已提交
2355

L
Liu Jicong 已提交
2356
_end:
H
Haojun Liao 已提交
2357 2358 2359 2360
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2361 2362
}

2363
static void destroyStreamScanOperatorInfo(void* param) {
2364
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2365

2366
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
2367
    destroyOperator(pStreamScan->pTableScanOp);
2368
  }
2369

2370
  if (pStreamScan->tqReader) {
2371
    pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
2372
  }
H
Haojun Liao 已提交
2373 2374
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2375
  }
C
Cary Xu 已提交
2376 2377
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2378
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2379
  }
C
Cary Xu 已提交
2380

L
Liu Jicong 已提交
2381
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2382
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2383

2384
  pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
2385 2386 2387 2388
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2389
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2390
  blockDataDestroy(pStreamScan->pCreateTbRes);
2391 2392 2393 2394
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

L
liuyao 已提交
2395 2396
void streamScanReleaseState(SOperatorInfo* pOperator) {
  SStreamScanInfo* pInfo = pOperator->info;
L
liuyao 已提交
2397 2398 2399
  if (!pInfo->pState) {
    return;
  }
L
liuyao 已提交
2400 2401 2402 2403 2404 2405 2406
  if (!pInfo->pUpdateInfo) {
    return;
  }
  int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
  void* pBuff = taosMemoryCalloc(1, len);
  pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo);
  pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len);
L
liuyao 已提交
2407
  taosMemoryFree(pBuff);
L
liuyao 已提交
2408 2409 2410 2411
}

void streamScanReloadState(SOperatorInfo* pOperator) {
  SStreamScanInfo* pInfo = pOperator->info;
L
liuyao 已提交
2412 2413 2414
  if (!pInfo->pState) {
    return;
  }
L
liuyao 已提交
2415 2416 2417
  void*   pBuff = NULL;
  int32_t len = 0;
  pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), &pBuff, &len);
L
liuyao 已提交
2418
  SUpdateInfo* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
L
liuyao 已提交
2419
  int32_t      code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
2420
  taosMemoryFree(pBuff);
L
liuyao 已提交
2421 2422 2423 2424 2425
  if (code == TSDB_CODE_SUCCESS && pInfo->pUpdateInfo) {
    if (pInfo->pUpdateInfo->minTS < 0) {
      pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
      pInfo->pUpdateInfo = pUpInfo;
    } else {
L
liuyao 已提交
2426 2427 2428
      pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1);
      pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1);
      ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS);
L
liuyao 已提交
2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion);
      SHashObj* curMap = pInfo->pUpdateInfo->pMap;
      void *pIte = taosHashIterate(curMap, NULL);
      while (pIte != NULL) {
        size_t keySize = 0;
        int64_t* pUid = taosHashGetKey(pIte, &keySize);
        taosHashPut(pUpInfo->pMap, pUid, sizeof(int64_t), pIte, sizeof(TSKEY));
        pIte = taosHashIterate(curMap, pIte);
      }
      taosHashCleanup(curMap);
      pInfo->pUpdateInfo->pMap = pUpInfo->pMap;
      pUpInfo->pMap = NULL;
      pInfo->stateStore.updateInfoDestroy(pUpInfo);
    }
L
liuyao 已提交
2443 2444
  } else {
    pInfo->stateStore.updateInfoDestroy(pUpInfo);
L
liuyao 已提交
2445 2446 2447
  }
}

2448
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2449
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2450
  SArray*          pColIds = NULL;
2451 2452
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2453 2454
  SStorageAPI*     pAPI = &pTaskInfo->storageAPI;
  const char* idstr = pTaskInfo->id.str;
2455

H
Haojun Liao 已提交
2456
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2457
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2458
    tableListDestroy(pTableListInfo);
2459
    goto _error;
H
Haojun Liao 已提交
2460 2461
  }

2462
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2463
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2464

2465
  pInfo->pTagCond = pTagCond;
2466
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2467

2468
  int32_t numOfCols = 0;
2469 2470
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2471
  if (code != TSDB_CODE_SUCCESS) {
2472
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2473 2474
    goto _error;
  }
2475

H
Haojun Liao 已提交
2476
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2477
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2478
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2479
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2480 2481

    int16_t colId = id->colId;
2482
    taosArrayPush(pColIds, &colId);
2483
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2484
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2485
    }
H
Haojun Liao 已提交
2486 2487
  }

L
Liu Jicong 已提交
2488 2489 2490 2491
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2492
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2493 2494
      goto _error;
    }
2495

L
Liu Jicong 已提交
2496 2497
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
2498
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1, &pTaskInfo->storageAPI.functionStore) != 0) {
2499
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2500 2501 2502 2503
      goto _error;
    }
  }

2504 2505
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2506
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2507 2508
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2509
      tableListDestroy(pTableListInfo);
2510 2511
      goto _error;
    }
2512
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags, &pTaskInfo->storageAPI.functionStore) != 0) {
2513
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2514
      tableListDestroy(pTableListInfo);
2515 2516 2517 2518
      goto _error;
    }
  }

L
Liu Jicong 已提交
2519
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2520
  if (pInfo->pBlockLists == NULL) {
2521
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2522
    tableListDestroy(pTableListInfo);
2523
    goto _error;
H
Haojun Liao 已提交
2524 2525
  }

5
54liuyao 已提交
2526
  if (pHandle->vnode) {
2527
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
2528 2529 2530 2531
    if (pTableScanOp == NULL) {
      qError("createTableScanOperatorInfo error, errorcode: %d", pTaskInfo->code);
      goto _error;
    }
L
Liu Jicong 已提交
2532
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2533
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2534
      pTSInfo->base.cond.endVersion = pHandle->version;
2535
    }
L
Liu Jicong 已提交
2536

2537
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2538
    int32_t        num = 0;
2539
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2540

2541
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2542
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2543
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2544 2545
    }

L
Liu Jicong 已提交
2546 2547
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
2548
      pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
L
Liu Jicong 已提交
2549
      ASSERT(pInfo->tqReader);
2550
    } else {
L
Liu Jicong 已提交
2551 2552
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2553 2554
    }

2555
    pInfo->pUpdateInfo = NULL;
2556
    pInfo->pTableScanOp = pTableScanOp;
2557
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
2558
      pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
2559
    }
L
Liu Jicong 已提交
2560

L
Liu Jicong 已提交
2561
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2562
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2563 2564
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2565

L
Liu Jicong 已提交
2566
    // set the extract column id to streamHandle
2567
    pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
2568
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2569
    code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList, idstr);
L
Liu Jicong 已提交
2570 2571 2572 2573
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2574

L
Liu Jicong 已提交
2575
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2576
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2577 2578
  } else {
    taosArrayDestroy(pColIds);
2579
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2580
    pColIds = NULL;
5
54liuyao 已提交
2581 2582
  }

2583 2584 2585 2586 2587
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2588 2589 2590 2591 2592
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2593
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2594
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2595
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2596
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2597
  pInfo->groupId = 0;
2598
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2599
  pInfo->pStreamScanOp = pOperator;
2600
  pInfo->deleteDataIndex = 0;
2601
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2602
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2603
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2604
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2605
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2606 2607
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2608
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2609
  pInfo->pState = pTaskInfo->streamInfo.pState;
2610 2611
  pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
  pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
L
Liu Jicong 已提交
2612

L
fix bug  
liuyao 已提交
2613 2614 2615 2616
  // for stream
  if (pTaskInfo->streamInfo.pState) {
    void*   buff = NULL;
    int32_t len = 0;
2617 2618
    pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
    streamScanOperatorDecode(buff, len, pInfo);
dengyihao's avatar
dengyihao 已提交
2619
    taosMemoryFree(buff);
L
fix bug  
liuyao 已提交
2620
  }
L
liuyao 已提交
2621

L
fix bug  
liuyao 已提交
2622
  setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
L
Liu Jicong 已提交
2623
                  pTaskInfo);
2624
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2625

2626
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2627 2628
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
L
liuyao 已提交
2629
  setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState);
2630

H
Haojun Liao 已提交
2631
  return pOperator;
2632

L
Liu Jicong 已提交
2633
_error:
H
Haojun Liao 已提交
2634 2635 2636 2637 2638 2639 2640 2641
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2642 2643
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2644 2645
}

2646
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, SStorageAPI* pAPI) {
2647 2648 2649 2650
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];

2651
  STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
2652
  int32_t        code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
2653 2654 2655 2656
  tDecoderClear(&(*mr).coder);
  if (code != TSDB_CODE_SUCCESS) {
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
           GET_TASKID(pTaskInfo));
2657
    pAPI->metaReaderFn.clearReader(mr);
2658 2659 2660
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

2661
  char str[512];
2662 2663 2664 2665 2666 2667
  for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

    // refactor later
    if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
      STR_TO_VARSTR(str, (*mr).me.name);
2668
      colDataSetVal(pDst, (count), str, false);
2669 2670 2671
    } else {  // it is a tag value
      STagVal val = {0};
      val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2672
      const char* p = pAPI->metaFn.extractTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val);
2673 2674 2675 2676 2677 2678 2679

      char* data = NULL;
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
        data = (char*)p;
      }
2680
      colDataSetVal(pDst, (count), data,
2681 2682 2683 2684 2685 2686 2687 2688 2689 2690
                    (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));

      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
          data != NULL) {
        taosMemoryFree(data);
      }
    }
  }
}

2691
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2692 2693 2694 2695
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2696
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2697
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2698 2699

  STagScanInfo* pInfo = pOperator->info;
2700
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2701
  SSDataBlock*  pRes = pInfo->pRes;
2702
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2703

2704
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2705
  if (size == 0) {
H
Haojun Liao 已提交
2706 2707 2708 2709
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2710 2711 2712
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
H
Haojun Liao 已提交
2713
  pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0, &pAPI->metaFn);
H
Haojun Liao 已提交
2714

wmmhello's avatar
wmmhello 已提交
2715
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2716
    doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
2717
    ++count;
wmmhello's avatar
wmmhello 已提交
2718
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2719
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2720
    }
2721
    // each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
2722
    if (pInfo->pSlimit != NULL) {
2723 2724 2725
      if (pInfo->curPos < pInfo->pSlimit->offset) {
        continue;
      }
2726
      pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
2727 2728 2729
      if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
        setOperatorCompleted(pOperator);
      }
2730
      break;
H
Haojun Liao 已提交
2731 2732 2733
    }
  }

2734
  pAPI->metaReaderFn.clearReader(&mr);
2735

2736
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2737
  if (pOperator->status == OP_EXEC_DONE) {
2738
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2739 2740 2741
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2742
  pOperator->resultInfo.totalRows += count;
2743

2744
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2745 2746
}

2747
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2748 2749
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2750
  taosArrayDestroy(pInfo->matchInfo.pList);
2751
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2752
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2753 2754
}

S
slzhou 已提交
2755
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2756
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2757
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2758 2759 2760 2761 2762
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2763 2764 2765 2766
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2767
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
2768 2769 2770
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2771

H
Haojun Liao 已提交
2772 2773
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2774 2775 2776
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2777

2778
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2779
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2780 2781
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2782
  pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
2783

L
Liu Jicong 已提交
2784 2785
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2786
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2787 2788
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2789 2790
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2791 2792

  return pOperator;
2793

2794
_error:
H
Haojun Liao 已提交
2795 2796 2797 2798 2799
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2800

2801
static SSDataBlock* getBlockForTableMergeScan(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2802 2803 2804 2805
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2806 2807
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
2808
  SSDataBlock*                    pBlock = pInfo->pReaderBlock;
D
dapan1121 已提交
2809
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2810

L
Liu Jicong 已提交
2811
  int64_t      st = taosGetTimestampUs();
X
Xiaoyu Wang 已提交
2812
  bool         hasNext = false;
D
dapan1121 已提交
2813

H
Haojun Liao 已提交
2814
  STsdbReader* reader = pInfo->base.dataReader;
D
dapan1121 已提交
2815
  while (true) {
2816
    code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
D
dapan1121 已提交
2817
    if (code != 0) {
2818
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
2819
      qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
D
dapan1121 已提交
2820 2821 2822 2823 2824 2825
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2826

H
Haojun Liao 已提交
2827
    if (isTaskKilled(pTaskInfo)) {
2828
      qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
2829
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
2830
      break;
dengyihao's avatar
opt mem  
dengyihao 已提交
2831 2832 2833
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2834
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2835 2836 2837 2838 2839
    if (!processThisBlock) {
      continue;
    }

    uint32_t status = 0;
2840
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2841
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2842
    if (code != TSDB_CODE_SUCCESS) {
2843
      qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2844
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2845 2846
    }

2847 2848 2849 2850
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

dengyihao's avatar
opt mem  
dengyihao 已提交
2851 2852 2853 2854 2855
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2856
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2857

H
Haojun Liao 已提交
2858
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2859
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2860 2861 2862

    return pBlock;
  }
H
Haojun Liao 已提交
2863

dengyihao's avatar
opt mem  
dengyihao 已提交
2864 2865 2866
  return NULL;
}

2867 2868 2869
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2870
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2871
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2872
      tsTargetSlotId = colInfo->dstSlotId;
2873 2874 2875
    }
  }

2876 2877 2878
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2879
  bi.slotId = tsTargetSlotId;
2880 2881 2882 2883 2884 2885 2886
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2887
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2888 2889 2890 2891 2892 2893 2894
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2895

2896
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2897 2898
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
2899 2900
  SReadHandle* pHandle = &pInfo->base.readHandle;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2901

S
slzhou 已提交
2902
  {
2903
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2904
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2905
    for (; i < numOfTables; ++i) {
2906
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2907 2908 2909 2910 2911 2912
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2913

S
slzhou 已提交
2914 2915
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2916

2917
  bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
S
slzhou 已提交
2918
  int64_t mergeLimit = -1;
2919 2920 2921 2922
  if (hasLimit) {
      mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
  }
  size_t szRow = blockDataGetRowSize(pInfo->pResBlock);   
2923
  if (hasLimit) {
2924 2925 2926 2927 2928 2929 2930 2931 2932 2933
    pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
                                              NULL, pTaskInfo->id.str, mergeLimit, szRow+8, tsPQSortMemThreshold * 1024* 1024);
  } else {
    pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
    int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
    pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
                                              pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
                                          
    tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
  }
2934 2935

  tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2936 2937 2938

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
H
Haojun Liao 已提交
2939

2940 2941
  STableMergeScanSortSourceParam *param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
  param->pOperator = pOperator;
2942 2943
  STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
  pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2944

2945
  SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
2946 2947
  ps->param = param;
  ps->onlyRef = false;
2948
  tsortAddSource(pInfo->pSortHandle, ps);
2949

2950 2951 2952 2953 2954 2955
  int32_t code = TSDB_CODE_SUCCESS;
  if (numOfTable == 1) {
    setSingleTableMerge(pInfo->pSortHandle);
  } else {
    code = tsortOpen(pInfo->pSortHandle);
  }
2956 2957

  if (code != TSDB_CODE_SUCCESS) {
2958
    T_LONG_JMP(pTaskInfo->env, terrno);
2959 2960
  }

2961 2962 2963 2964 2965 2966
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
2967
  SStorageAPI*         pAPI = &pTaskInfo->storageAPI;
2968

2969 2970 2971 2972 2973 2974 2975
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

2976 2977 2978
  if (pInfo->base.dataReader != NULL) {
    pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
    pInfo->base.dataReader = NULL;
2979
  }
2980

2981
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2982
  pInfo->pSortHandle = NULL;
2983

2984
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2985 2986 2987
  return TSDB_CODE_SUCCESS;
}

2988 2989
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2990 2991
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2992 2993 2994
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2995
  blockDataCleanup(pResBlock);
S
slzhou 已提交
2996
  STupleHandle* pTupleHandle = NULL;
2997
  while (1) {
S
slzhou 已提交
2998 2999 3000 3001 3002 3003 3004 3005 3006 3007
    while (1) {
      pTupleHandle = tsortNextTuple(pHandle);
      if (pTupleHandle == NULL) {
        break;
      }

      appendOneRowToDataBlock(pResBlock, pTupleHandle);
      if (pResBlock->info.rows >= capacity) {
        break;
      }
3008 3009
    }

S
slzhou 已提交
3010 3011 3012
    if (tsortIsClosed(pHandle)) {
      terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
      T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
3013 3014
    }

S
slzhou 已提交
3015 3016 3017 3018 3019 3020
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
    qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
          pInfo->limitInfo.numOfOutputRows);
    if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) {
      break;
    }  
D
dapan1121 已提交
3021
  }
3022
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
3035
    T_LONG_JMP(pTaskInfo->env, code);
3036
  }
3037

3038
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
3039 3040
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
3041

S
slzhou 已提交
3042
    if (tableListSize == 0) {
H
Haojun Liao 已提交
3043
      setOperatorCompleted(pOperator);
3044 3045
      return NULL;
    }
S
slzhou 已提交
3046
    pInfo->tableStartIndex = 0;
3047
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
3048 3049
    startGroupTableMergeScan(pOperator);
  }
3050

S
slzhou 已提交
3051 3052
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
3053 3054 3055 3056
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
3057 3058
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
3059
    if (pBlock != NULL) {
H
Haojun Liao 已提交
3060
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
3061 3062 3063
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
3064
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
3065 3066
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
3067
        setOperatorCompleted(pOperator);
S
slzhou 已提交
3068 3069
        break;
      }
3070

S
slzhou 已提交
3071
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
3072
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
3073
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
3074
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
3075
    }
wmmhello's avatar
wmmhello 已提交
3076 3077
  }

3078 3079 3080
  return pBlock;
}

3081
void destroyTableMergeScanOperatorInfo(void* param) {
3082
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
3083
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
3084

3085
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
H
Haojun Liao 已提交
3086

3087
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
3088 3089
  pTableScanInfo->base.dataReader = NULL;

3090
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
3091 3092
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
3093

3094
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
3095 3096 3097

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
S
slzhou 已提交
3098
  pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
3099 3100

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
3101
  taosMemoryFreeClear(param);
3102 3103 3104 3105
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
3106 3107
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3108
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
3109
  execInfo->blockRecorder = pInfo->base.readRecorder;
3110
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3111 3112 3113

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3114

3115 3116 3117
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3118
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3119
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3120 3121 3122 3123 3124
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3125

3126 3127 3128
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3129
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3130
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3131 3132 3133
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3134

H
Haojun Liao 已提交
3135
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3136
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3137
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3138 3139 3140 3141
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3142
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3143
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
3144
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset, &pTaskInfo->storageAPI.functionStore);
3145 3146 3147 3148
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3149 3150 3151 3152 3153 3154
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3155
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
3156 3157
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3158
  pInfo->base.readHandle = *readHandle;
3159

3160 3161
  pInfo->readIdx = -1;

3162 3163
  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3164
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3165

3166
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3167
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3168 3169 3170 3171 3172 3173

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3174
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3175
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3176 3177
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3178
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3179

H
Haojun Liao 已提交
3180
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3181
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3182
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3183

S
slzhou 已提交
3184 3185
  pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);

dengyihao's avatar
dengyihao 已提交
3186
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3187 3188
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3189

L
Liu Jicong 已提交
3190 3191
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3192
  pOperator->exprSupp.numOfExprs = numOfCols;
3193

3194 3195
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3196 3197 3198 3199 3200 3201 3202 3203 3204
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3205 3206 3207 3208

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3209
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3210
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3211
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
S
slzhou 已提交
3212
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3213
                                                   SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
S
slzhou 已提交
3214 3215
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3216 3217
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3218 3219 3220 3221 3222 3223 3224
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3286 3287
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
H
Haojun Liao 已提交
3300

S
slzhou 已提交
3301 3302 3303 3304 3305 3306 3307 3308 3309
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
H
Haojun Liao 已提交
3310

S
slzhou 已提交
3311 3312 3313 3314 3315 3316 3317
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3318

S
slzhou 已提交
3319
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3320 3321 3322
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3323
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3324
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3325
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3326 3327 3328 3329 3330 3331 3332 3333 3334

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3335
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3336 3337
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3338 3339 3340
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3341 3342 3343

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3344 3345
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3357 3358 3359
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3360
    ASSERT(strlen(dbName));
S
slzhou 已提交
3361
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3362 3363 3364 3365

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3366
    varDataSetLen(varDbName, strlen(dbName));
3367
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3368 3369 3370 3371
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3372
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3373
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3374
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3375
      varDataSetLen(varStbName, strlen(stbName));
3376
      colDataSetVal(colInfoData, 0, varStbName, false);
3377
    } else {
3378
      colDataSetNULL(colInfoData, 0);
3379
    }
S
slzhou 已提交
3380 3381 3382
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3383
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3384
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3385 3386 3387 3388
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3389
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3390 3391 3392
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3393
  size_t infodbTableNum;
S
slzhou 已提交
3394
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3395
  size_t perfdbTableNum;
S
slzhou 已提交
3396 3397
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3398
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3399
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3400 3401
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3402
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3403 3404 3405 3406
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3423 3424 3425 3426 3427 3428
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3429

S
slzhou 已提交
3430 3431 3432
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3433 3434 3435 3436 3437 3438 3439
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3440 3441 3442 3443 3444 3445 3446 3447
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3448
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3449 3450 3451 3452
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3453
  blockDataCleanup(pRes);
3454

S
slzhou 已提交
3455 3456 3457
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3458
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3459
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3460
  }
S
slzhou 已提交
3461

S
slzhou 已提交
3462 3463 3464 3465 3466
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3467 3468
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3469
  char        dbName[TSDB_DB_NAME_LEN] = {0};
3470 3471
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
S
slzhou 已提交
3472

S
slzhou 已提交
3473
  // get dbname
3474
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
S
slzhou 已提交
3475 3476 3477 3478
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3479
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3480 3481 3482 3483 3484 3485 3486 3487 3488
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
3489 3490 3491
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3492 3493 3494
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
H
Haojun Liao 已提交
3495
      if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, TSDB_SUPER_TABLE, pInfo->stbUidList) < 0) {
S
slzhou 已提交
3496
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3497
      }
S
slzhou 已提交
3498 3499 3500
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
3501
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
S
slzhou 已提交
3502 3503 3504

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
3505
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName, pAPI);
S
slzhou 已提交
3506 3507

      pInfo->currGrpIdx++;
S
slzhou 已提交
3508
    } else {
S
slzhou 已提交
3509
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3510 3511
    }
  } else {
S
slzhou 已提交
3512 3513
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
3514 3515

    int64_t dbTableCount = 0;
3516
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &dbTableCount, NULL);
S
slzhou 已提交
3517 3518 3519 3520 3521 3522 3523
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
3524 3525 3526
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3527 3528
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
H
Haojun Liao 已提交
3529
      uint64_t uid = 0;
3530
      pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
3531 3532 3533 3534 3535

      int64_t numOfChildTables = 0;
      pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables);

      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
S
slzhou 已提交
3536
    } else {
H
Haojun Liao 已提交
3537 3538
      int64_t tbNumVnode = 0;
      pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3539
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3540
    }
S
slzhou 已提交
3541
  } else {
3542 3543
    int64_t tbNumVnode = 0;
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3544
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3545
  }
3546

S
slzhou 已提交
3547 3548 3549 3550
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3551
                                           SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI) {
S
slzhou 已提交
3552
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3553 3554 3555
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3556

S
slzhou 已提交
3557 3558
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
3559

3560
  int64_t numOfTables = 0;
3561
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
H
Haojun Liao 已提交
3562

3563 3564
  if (numOfTables != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes);
3565
  }
S
slzhou 已提交
3566 3567 3568
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3569
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI) {
S
slzhou 已提交
3570
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
3571
  pAPI->metaFn.getTableNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
S
slzhou 已提交
3572 3573

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3574
  if (pSupp->groupByDbName) {
H
Haojun Liao 已提交
3575
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, varDataVal(stbName));
D
dapan1121 已提交
3576
  } else {
H
Haojun Liao 已提交
3577
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", varDataVal(stbName));
D
dapan1121 已提交
3578
  }
X
Xiaoyu Wang 已提交
3579

S
slzhou 已提交
3580 3581 3582
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

H
Haojun Liao 已提交
3583 3584
  int64_t ctbNum = 0;
  int32_t code = pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, stbUid, &ctbNum);
H
Haojun Liao 已提交
3585
  fillTableCountScanDataBlock(pSupp, dbName, varDataVal(stbName), ctbNum, pRes);
S
shenglian zhou 已提交
3586 3587 3588
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3589
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3590 3591
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3592
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3593
  taosMemoryFreeClear(param);
3594
}