scanoperator.c 132.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorInt.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
33 34
#include "operator.h"
#include "querytask.h"
H
Haojun Liao 已提交
35

36 37 38
#include "storageapi.h"
#include "wal.h"

D
dapan1121 已提交
39 40
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
41
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
42
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
43
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
L
fix bug  
liuyao 已提交
44
#define STREAM_SCAN_OP_NAME          "StreamScanOperator"
45

H
Haojun Liao 已提交
46 47 48 49 50 51 52 53 54 55
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
56 57
  bool           multiReader;
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
58 59
} STableMergeScanSortSourceParam;

60 61 62 63 64 65 66 67 68 69
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
70
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
71

H
Haojun Liao 已提交
72
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
73 74 75 76 77 78 79 80 81 82 83 84
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

85
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
86 87 88 89 90
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

91 92 93 94 95 96 97 98 99
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
100
  // convert key to second
101 102 103 104 105 106 107
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
108
  time_t    t = (time_t)key;
109
  taosLocalTime(&t, &tm, NULL);
110 111 112 113

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
114
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
115 116 117 118

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
119
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
120 121 122 123

  tw->ekey -= 1;
}

124
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
125 126 127 128 129 130 131
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

132
  if (order == TSDB_ORDER_ASC) {
133
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
134
    ASSERT(w.ekey >= pBlockInfo->window.skey);
135

136
    if (w.ekey < pBlockInfo->window.ekey) {
137 138 139
      return true;
    }

140 141
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
142 143 144 145
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

146
      ASSERT(w.ekey > pBlockInfo->window.ekey);
147
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
148 149 150 151
        return true;
      }
    }
  } else {
152
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
153
    ASSERT(w.skey <= pBlockInfo->window.ekey);
154

155
    if (w.skey > pBlockInfo->window.skey) {
156 157 158
      return true;
    }

159
    while (1) {
160 161 162 163 164
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

H
Haojun Liao 已提交
165
      ASSERT(w.skey < pBlockInfo->window.skey);
166
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
167 168 169
        return true;
      }
    }
170 171 172 173 174
  }

  return false;
}

175 176 177 178 179 180 181 182 183 184 185
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
186 187
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
188 189 190 191 192

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
193
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
194 195 196
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
197

198 199 200
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

201 202 203 204 205 206 207 208 209 210 211 212 213 214
static int32_t insertTableToScanIgnoreList(STableScanInfo* pTableScanInfo, uint64_t uid) {
  if (NULL == pTableScanInfo->pIgnoreTables) {
    int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
    pTableScanInfo->pIgnoreTables = taosHashInit(tableNum,  taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (NULL == pTableScanInfo->pIgnoreTables) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  
  taosHashPut(pTableScanInfo->pIgnoreTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes, sizeof(pTableScanInfo->scanTimes));

  return TSDB_CODE_SUCCESS;
}

215 216
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;
217 218
  int32_t code = TSDB_CODE_SUCCESS;
  
H
Haojun Liao 已提交
219
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
220 221 222
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
223
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
224 225

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
226
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
227 228 229 230 231 232 233 234 235

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
236
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
237 238 239 240 241 242 243 244 245

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
246
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
247 248 249

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
250
    code = insertTableToScanIgnoreList(pTableScanInfo, pBlockInfo->id.uid);
251 252
  }

253
  return code;
254 255
}

H
Haojun Liao 已提交
256
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
257
                               int32_t numOfRows) {
H
Haojun Liao 已提交
258
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
259 260 261
    return true;
  }

H
Haojun Liao 已提交
262
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
263 264 265
  return keep;
}

H
Haojun Liao 已提交
266
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
267 268
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

H
Haojun Liao 已提交
269
  bool    allColumnsHaveAgg = true;
270
  int32_t code = pAPI->tsdReader.tsdReaderRetrieveBlockSMAInfo(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
271
  if (code != TSDB_CODE_SUCCESS) {
272
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
273 274 275 276 277 278 279 280
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
281
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
282
                               int32_t rows) {
H
Haojun Liao 已提交
283 284 285
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

286
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
287
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
288
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
289
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
290 291
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
292 293 294

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
295 296 297
  }
}

298
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
299
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
300
  const char* id = GET_TASKID(pTaskInfo);
301

302
  if (pLimitInfo->remainOffset > 0) {
303 304
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
305
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
306
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
307
      return false;
308
    } else {
309
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
310 311 312 313 314 315
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
316
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
317
    blockDataKeepFirstNRows(pBlock, keep);
318 319

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
320
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
321
    return true;
322
  }
323

324
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
325
  return false;
326 327
}

H
Haojun Liao 已提交
328
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
329
                             uint32_t* status) {
S
slzhou 已提交
330
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
331 332
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

333
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
334 335

  pCost->totalBlocks += 1;
336
  pCost->totalRows += pBlock->info.rows;
337

H
Haojun Liao 已提交
338
  bool loadSMA = false;
H
Haojun Liao 已提交
339
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
340
  if (pOperator->exprSupp.pFilterInfo != NULL ||
341
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
342 343 344 345
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
346
  taosMemoryFreeClear(pBlock->pBlockAgg);
347 348

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
349
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
350
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
351
    pCost->filterOutBlocks += 1;
352
    pCost->totalRows += pBlock->info.rows;
353
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
354 355
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
356 357 358
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
359
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
360
    pCost->skipBlocks += 1;
361
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
362
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
363
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
364
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
365
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
366
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
367
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
368
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
369
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
370
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
371
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
372 373
      return TSDB_CODE_SUCCESS;
    } else {
374
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
375
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
376
    }
H
Haojun Liao 已提交
377
  }
378

H
Haojun Liao 已提交
379
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
380

H
Haojun Liao 已提交
381
  // try to filter data block according to sma info
H
Haojun Liao 已提交
382
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
383 384 385
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
386
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
387
      if (!keep) {
X
Xiaoyu Wang 已提交
388 389
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
390 391 392
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

393
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
394 395
        return TSDB_CODE_SUCCESS;
      }
396
    }
H
Haojun Liao 已提交
397
  }
398

399 400 401
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

402
  // try to filter data block according to current results
403 404
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
405 406
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
407
    pCost->skipBlocks += 1;
408
    pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
409

410 411
    STableScanInfo* p1 = pOperator->info;
    if (taosHashGetSize(p1->pIgnoreTables) == taosArrayGetSize(p1->base.pTableListInfo->pTableList)) {
412 413 414 415
      *status = FUNC_DATA_REQUIRED_ALL_FILTEROUT;
    } else {
      *status = FUNC_DATA_REQUIRED_FILTEROUT;
    }
416 417 418
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
419 420
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
421

422
  SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
H
Haojun Liao 已提交
423
  if (p == NULL) {
H
Haojun Liao 已提交
424
    return terrno;
H
Haojun Liao 已提交
425 426
  }

H
Haojun Liao 已提交
427
  ASSERT(p == pBlock);
428
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
429

H
Haojun Liao 已提交
430 431
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
432

H
Haojun Liao 已提交
433
  if (pOperator->exprSupp.pFilterInfo != NULL) {
434
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
435
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
436

437 438
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
439

440 441
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
442
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
443 444 445 446
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
447 448
  }

449
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
450
  if (limitReached) {  // set operator flag is done
451 452
    setOperatorCompleted(pOperator);
  }
453

H
Haojun Liao 已提交
454
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
455 456 457
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
458
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
459 460 461
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
462
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
463 464
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
465 466
}

467 468
typedef struct STableCachedVal {
  const char* pName;
469
  STag*       pTags;
470 471
} STableCachedVal;

472 473 474 475 476 477 478 479 480 481 482
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
483 484
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
485
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
486 487 488 489
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
490
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
491 492 493 494 495 496 497
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

498 499
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
500

501 502 503 504 505
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
506
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
507 508 509
  }
}

510 511
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
512
  // currently only the tbname pseudo column
513
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
514
    return TSDB_CODE_SUCCESS;
515 516
  }

517 518
  int32_t code = 0;

519 520 521 522
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

523
  bool            freeReader = false;
524
  STableCachedVal val = {0};
525 526

  SMetaReader mr = {0};
527
  LRUHandle*  h = NULL;
528

529 530 531
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

532
  // 1. check if it is existed in meta cache
533
  if (pCache == NULL) {
534
    pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
535
    code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
536
    if (code != TSDB_CODE_SUCCESS) {
537
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
538
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
539 540
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
541 542 543

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
544
      } else {
S
slzhou 已提交
545 546
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
547
      }
548
      pHandle->api.metaReaderFn.clearReader(&mr);
549 550 551
      return terrno;
    }

552
    pHandle->api.metaReaderFn.readerReleaseLock(&mr);
553

554 555
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
556 557

    freeReader = true;
558
  } else {
559 560
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
561
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
562
    if (h == NULL) {
563
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, 0);
564
      code = pHandle->api.metaReaderFn.getEntryGetUidCache(&mr, pBlock->info.id.uid);
565
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
566
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
567
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
568
                pBlock->info.id.uid, tstrerror(terrno), idStr);
569 570
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
571
        } else {
H
Haojun Liao 已提交
572
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
573
                 idStr);
H
Haojun Liao 已提交
574
        }
575
        pHandle->api.metaReaderFn.clearReader(&mr);
576 577 578
        return terrno;
      }

579
      pHandle->api.metaReaderFn.readerReleaseLock(&mr);
580

H
Haojun Liao 已提交
581
      STableCachedVal* pVal = createTableCacheVal(&mr);
582

H
Haojun Liao 已提交
583
      val = *pVal;
584
      freeReader = true;
H
Haojun Liao 已提交
585

H
Haojun Liao 已提交
586
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
587
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
588 589 590 591 592 593 594 595
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
596

H
Haojun Liao 已提交
597
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
598
    }
H
Haojun Liao 已提交
599

600 601
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
602
  }
603

604 605
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
606
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
607 608

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
609
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
610

611
    int32_t functionId = pExpr1->pExpr->_function.functionId;
612 613 614

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
615
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
616
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
617
      STagVal tagVal = {0};
618
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
619
      const char* p = pHandle->api.metaFn.extractTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
620

621 622 623 624
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
625
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
626
      }
627

H
Haojun Liao 已提交
628 629
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
630
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
631
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
632
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
633 634 635
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
636 637
        if (code) {
          if (freeReader) {
638
            pHandle->api.metaReaderFn.clearReader(&mr);
D
dapan1121 已提交
639 640 641
          }
          return code;
        }
L
Liu Jicong 已提交
642
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
643
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
644
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
645
        }
646 647 648 649
      }
    }
  }

650 651
  // restore the rows
  pBlock->info.rows = backupRows;
652
  if (freeReader) {
653
    pHandle->api.metaReaderFn.clearReader(&mr);
654 655
  }

H
Haojun Liao 已提交
656
  return TSDB_CODE_SUCCESS;
657 658
}

H
Haojun Liao 已提交
659
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
660 661 662
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
663
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
664
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
665 666 667
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
668

H
Haojun Liao 已提交
669
  colInfoDataEnsureCapacity(&infoData, 1, false);
670
  colDataSetVal(&infoData, 0, buf, false);
671

H
Haojun Liao 已提交
672
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
673
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
674 675 676 677 678 679 680

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
681
  colDataDestroy(&infoData);
682 683
}

684
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
685
  STableScanInfo* pTableScanInfo = pOperator->info;
686
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
687 688
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;

L
Liu Jicong 已提交
689
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
690 691
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
692

693 694
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
695
  while (true) {
696
    code = pAPI->tsdReader.tsdNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
D
dapan1121 已提交
697
    if (code) {
698
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
699 700 701 702 703 704
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
705

706
    if (isTaskKilled(pTaskInfo)) {
707
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
708
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
709
    }
H
Haojun Liao 已提交
710

711
    if (pOperator->status == OP_EXEC_DONE) {
712
      pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->base.dataReader);
713 714 715
      break;
    }

716 717 718 719 720 721
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
722
    if (pBlock->info.id.uid) {
723
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
724
    }
725

726
    uint32_t status = 0;
727
    code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
728
    if (code != TSDB_CODE_SUCCESS) {
729
      T_LONG_JMP(pTaskInfo->env, code);
730
    }
731

732 733 734 735
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

736 737 738
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
739
    }
740

H
Haojun Liao 已提交
741 742
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
743

H
Haojun Liao 已提交
744
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
745
    return pBlock;
H
Haojun Liao 已提交
746 747 748 749
  }
  return NULL;
}

H
Haojun Liao 已提交
750
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
751 752
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
753 754
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
  
H
Haojun Liao 已提交
755
  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
756
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
757 758 759
    return NULL;
  }

760 761
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
762 763 764
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
765 766
    }

767
    pTableScanInfo->scanTimes += 1;
768
    taosHashClear(pTableScanInfo->pIgnoreTables);
769

770
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
771
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
772 773
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
774
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
775

776
      // do prepare for the next round table scan operation
777
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
778
    }
779
  }
H
Haojun Liao 已提交
780

781
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
782
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
783 784
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
785
      pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
786
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
787
    }
H
Haojun Liao 已提交
788

789
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
790 791 792
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
793
      }
H
Haojun Liao 已提交
794

795
      pTableScanInfo->scanTimes += 1;
796
      taosHashClear(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
797

798
      if (pTableScanInfo->scanTimes < total) {
799
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
800
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
801

802
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
803
        pAPI->tsdReader.tsdReaderResetStatus(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
804
      }
H
Haojun Liao 已提交
805 806 807
    }
  }

wmmhello's avatar
wmmhello 已提交
808 809 810 811 812 813
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
814
  SStorageAPI*    pAPI = &pTaskInfo->storageAPI;
wmmhello's avatar
wmmhello 已提交
815

816
  // scan table one by one sequentially
L
Liu Jicong 已提交
817
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
818
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
819
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
820

L
Liu Jicong 已提交
821
    while (1) {
H
Haojun Liao 已提交
822
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
823
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
824 825
        return result;
      }
H
Haojun Liao 已提交
826

L
Liu Jicong 已提交
827 828
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
829 830

      taosRLockLatch(&pTaskInfo->lock);
831
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
832

H
Haojun Liao 已提交
833
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
834
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
835
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
836 837
        return NULL;
      }
H
Haojun Liao 已提交
838

X
Xiaoyu Wang 已提交
839
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
840 841
      taosRUnLockLatch(&pTaskInfo->lock);

842
      pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
843
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
844
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
845

846
      pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
847 848
      pInfo->scanTimes = 0;
    }
849 850
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
851
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
852
        setOperatorCompleted(pOperator);
853 854
        return NULL;
      }
855

5
54liuyao 已提交
856
      int32_t        num = 0;
857
      STableKeyInfo* pList = NULL;
858
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
859
      ASSERT(pInfo->base.dataReader == NULL);
860

861
      int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
862
                                    (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
863 864 865
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
866 867 868 869

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
870
    }
H
Haojun Liao 已提交
871

H
Haojun Liao 已提交
872
    SSDataBlock* result = doGroupedTableScan(pOperator);
873 874 875
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
876

877
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
878
      setOperatorCompleted(pOperator);
879 880
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
881

882 883
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
884
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
885

5
54liuyao 已提交
886
    int32_t        num = 0;
887
    STableKeyInfo* pList = NULL;
888
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
889

890 891
    pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
    pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
892
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
893

H
Haojun Liao 已提交
894
    result = doGroupedTableScan(pOperator);
895 896 897
    if (result != NULL) {
      return result;
    }
898

H
Haojun Liao 已提交
899
    setOperatorCompleted(pOperator);
900 901
    return NULL;
  }
H
Haojun Liao 已提交
902 903
}

904 905
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
906
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
907
  *pRecorder = pTableScanInfo->base.readRecorder;
908 909 910 911 912
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

913
static void destroyTableScanBase(STableScanBase* pBase, TsdReader* pAPI) {
914
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
915

916
  pAPI->tsdReaderClose(pBase->dataReader);
917
  pBase->dataReader = NULL;
918

919 920
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
921
  }
L
Liu Jicong 已提交
922

923
  tableListDestroy(pBase->pTableListInfo);
924 925 926 927 928 929 930
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
931
  taosHashCleanup(pTableScanInfo->pIgnoreTables);
H
Haojun Liao 已提交
932
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
D
dapan1121 已提交
933
  taosMemoryFreeClear(param);
934 935
}

936
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
937
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
938
  int32_t         code = 0;
H
Haojun Liao 已提交
939 940 941
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
942
    code = TSDB_CODE_OUT_OF_MEMORY;
943
    goto _error;
H
Haojun Liao 已提交
944 945
  }

946
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
947
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
948 949

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
950
  code =
H
Haojun Liao 已提交
951
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
952 953 954 955
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
956
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
957
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
958
  if (code != TSDB_CODE_SUCCESS) {
959
    goto _error;
960 961
  }

H
Haojun Liao 已提交
962
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
963
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
964
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
965
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
966 967
  }

968
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
969
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
970

H
Haojun Liao 已提交
971 972
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
973 974
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

975 976
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
977

H
Haojun Liao 已提交
978
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
979
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
980
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
981
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
982

H
Haojun Liao 已提交
983 984 985
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
986 987
  }

wmmhello's avatar
wmmhello 已提交
988
  pInfo->currentGroupId = -1;
989
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
990
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
991

L
Liu Jicong 已提交
992 993
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
994
  pOperator->exprSupp.numOfExprs = numOfCols;
995

996
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
997 998
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
999 1000 1001
    code = terrno;
    goto _error;
  }
1002

D
dapan1121 已提交
1003 1004 1005 1006
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
1007
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
1008 1009
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
1010 1011 1012

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
1013
  return pOperator;
1014

1015
_error:
1016 1017 1018
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
1019

1020 1021
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
1022
  return NULL;
H
Haojun Liao 已提交
1023 1024
}

1025
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
1026
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
1027
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
1028

H
Haojun Liao 已提交
1029
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
1030
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
1031

L
Liu Jicong 已提交
1032 1033
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
1034
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
1035 1036 1037
  return pOperator;
}

1038
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1039
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1040 1041
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1042 1043
}

1044
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1045
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1046 1047
}

1048
static bool isStateWindow(SStreamScanInfo* pInfo) {
1049
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1050
}
5
54liuyao 已提交
1051

L
Liu Jicong 已提交
1052
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1053 1054 1055
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1056 1057 1058
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1059
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1060 1061
}

1062 1063 1064 1065
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1066
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1067 1068
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1069
  ASSERT(rowIndex < pBlock->info.rows);
1070
  pInfo->groupId = groupCol[rowIndex];
1071 1072
}

1073
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) {
H
Haojun Liao 已提交
1074
  pTableScanInfo->base.cond.twindows = *pWin;
1075
  pTableScanInfo->base.cond.endVersion = ver;
L
Liu Jicong 已提交
1076 1077
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1078
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1079
  pTableScanInfo->base.dataReader = NULL;
1080 1081
}

L
Liu Jicong 已提交
1082 1083
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1084
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1085

1086
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1087
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1088 1089 1090 1091 1092 1093

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
1094
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;
1095 1096 1097

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
1098
  int32_t      code = pAPI->tsdReader.tsdReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
1099
                                     (void**)&pReader, GET_TASKID(pTaskInfo), false, NULL);
1100 1101
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1102
    T_LONG_JMP(pTaskInfo->env, code);
1103 1104 1105
    return NULL;
  }

D
dapan1121 已提交
1106
  bool hasNext = false;
1107
  code = pAPI->tsdReader.tsdNextDataBlock(pReader, &hasNext);
1108 1109
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1110
    T_LONG_JMP(pTaskInfo->env, code);
1111 1112 1113
    return NULL;
  }

D
dapan1121 已提交
1114
  if (hasNext) {
1115
    /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1116
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1117
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1118 1119
  }

1120
  pAPI->tsdReader.tsdReaderClose(pReader);
D
dapan1121 已提交
1121
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1122 1123
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1124 1125

  return pBlock->info.rows > 0 ? pBlock : NULL;
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1137
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1138
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1139
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1140 1141
}

5
54liuyao 已提交
1142 1143 1144 1145 1146 1147 1148 1149
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1150
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1151 1152 1153
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1164 1165 1166
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1167 1168 1169 1170 1171 1172

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1173
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1174 1175 1176 1177
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1178 1179 1180
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1181
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1182 1183 1184
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1185

1186
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1187 1188 1189
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1190

1191 1192
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1193 1194 1195
    break;
  }

1196 1197
  STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info;
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1198
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1199 1200 1201
  return true;
}

5
54liuyao 已提交
1202
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1203
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1204
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1205
  dumyInfo.cur.pageId = -1;
1206
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1207 1208
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1209
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1210

5
54liuyao 已提交
1211
  while (1) {
1212 1213 1214
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1215
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1216 1217 1218 1219 1220
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1221
    }
5
54liuyao 已提交
1222

5
54liuyao 已提交
1223 1224 1225
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1226
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1227
    endWin = preWin;
5
54liuyao 已提交
1228
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1229 1230 1231 1232 1233 1234
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1235

L
Liu Jicong 已提交
1236
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1237
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1238
  bool prepareRes = true;
L
Liu Jicong 已提交
1239 1240 1241
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1242 1243
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1244 1245 1246 1247
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1248 1249 1250
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1251 1252
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1253
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1254
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1255
      pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
H
Haojun Liao 已提交
1256
      pTableScanInfo->base.dataReader = NULL;
1257 1258
      return NULL;
    }
L
Liu Jicong 已提交
1259

H
Haojun Liao 已提交
1260
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1261 1262 1263 1264
    if (pResult->info.rows == 0) {
      continue;
    }

1265 1266 1267 1268 1269 1270 1271 1272
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1273 1274
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1275
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1276 1277 1278 1279
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1280 1281 1282

      blockDataDestroy(tmpBlock);

1283 1284 1285 1286
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1287
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1288
      pResult->info.calWin = pInfo->updateWin;
1289
      return pResult;
5
54liuyao 已提交
1290 1291
    }
  }
1292
}
1293

1294
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1295
                                   SSessionKey* pKey) {
1296 1297 1298
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1299

1300 1301
  void* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
1302 1303 1304
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1305 1306

  taosMemoryFree(pCur);
1307 1308 1309
  return code;
}

1310
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1311
  blockDataCleanup(pDestBlock);
1312 1313
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1314
  }
1315
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1316
  if (code != TSDB_CODE_SUCCESS) {
1317
    return code;
L
Liu Jicong 已提交
1318
  }
1319 1320
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1321
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1322
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1323
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1324 1325
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1326

1327 1328
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1329
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1330
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1331 1332
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1333
  int64_t          version = pSrcBlock->info.version - 1;
1334
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1335
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1336
    // gap must be 0.
5
54liuyao 已提交
1337
    SSessionKey startWin = {0};
1338
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1339
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1340 1341 1342
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1343 1344
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1345
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1346 1347 1348 1349
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1350
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1351 1352
      continue;
    }
1353 1354
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1355

1356
    colDataSetNULL(pDestUidCol, i);
1357
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1358 1359
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1360
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1361
  }
1362
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1363
}
1364 1365 1366 1367 1368 1369

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1370
  }
1371

1372 1373
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1374 1375
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1376

L
Liu Jicong 已提交
1377
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1378
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1415 1416
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1417
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1418 1419 1420
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1421
  for (int32_t i = 0; i < rows;) {
1422
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1423 1424 1425 1426 1427
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1428
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1429
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1430 1431
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1432 1433 1434 1435 1436
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1437
    pDestBlock->info.rows++;
5
54liuyao 已提交
1438
  }
1439 1440
  return TSDB_CODE_SUCCESS;
}
1441

1442
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1443 1444 1445
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1446 1447
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1448
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1449 1450 1451 1452
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1463
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1464 1465
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1466
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1467 1468 1469
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1470
    if (pInfo->tbnameCalSup.pExprInfo) {
1471
      void* parTbname = NULL;
1472
      pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
1473

L
Liu Jicong 已提交
1474 1475
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
1476
      pInfo->stateStore.streamStateFreeVal(parTbname);
L
Liu Jicong 已提交
1477 1478 1479
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1480 1481 1482 1483
  }
  return TSDB_CODE_SUCCESS;
}

1484 1485 1486 1487
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1488
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1489
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1490 1491
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1492
  }
1493
  pDestBlock->info.type = STREAM_CLEAR;
1494
  pDestBlock->info.version = pSrcBlock->info.version;
1495
  pDestBlock->info.dataLoad = 1;
1496 1497 1498 1499
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1500
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1501
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1502 1503
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1504
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1505
  } else {
5
54liuyao 已提交
1506
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
1507
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore);
L
Liu Jicong 已提交
1508
  }
L
Liu Jicong 已提交
1509 1510
}

1511 1512
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1513 1514
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1515 1516
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1517 1518
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1519
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1520 1521 1522 1523 1524 1525 1526
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1527
  pBlock->info.rows++;
5
54liuyao 已提交
1528 1529
}

1530
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1531 1532
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1533
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1534
  }
1535 1536
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1537
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
1538
  bool   tableInserted = pInfo->stateStore.updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1539
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1540 1541
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1542
    bool        isClosed = false;
5
54liuyao 已提交
1543
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1544
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1545 1546 1547 1548 1549
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1550 1551 1552
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1553
    // must check update info first.
1554
    bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1555
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
1556
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore);
L
Liu Jicong 已提交
1557
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1558
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1559
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1560
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1561
                                       NULL);
5
54liuyao 已提交
1562 1563
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1564 1565
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1566
      }
1567 1568
    }
  }
1569 1570
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1571
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1572
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1573
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1574 1575
  }
}
L
Liu Jicong 已提交
1576

1577
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1578 1579
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1580
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1581

1582 1583
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1584
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1585
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1586
  pInfo->pRes->info.type = STREAM_NORMAL;
1587
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1588

1589
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1590
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1591 1592

  // todo extract method
H
Haojun Liao 已提交
1593 1594 1595
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1596 1597 1598 1599 1600 1601 1602
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1603
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1604
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1605 1606 1607 1608 1609 1610 1611
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1612
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1613
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1614 1615 1616 1617 1618
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1619
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
L
liuyao 已提交
1620
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
K
kailixu 已提交
1621 1622
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1623
      blockDataFreeRes((SSDataBlock*)pBlock);
1624
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1625
    }
K
kailixu 已提交
1626 1627 1628

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1629 1630
  }

1631
  if (filter) {
H
Haojun Liao 已提交
1632
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1633
  }
1634

1635
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1636
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
1637
//  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1638

L
Liu Jicong 已提交
1639
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1640 1641
  return 0;
}
5
54liuyao 已提交
1642

L
Liu Jicong 已提交
1643
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1644 1645 1646
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

1647
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1648
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1649

1650
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1651

1652
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1653 1654
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
1655 1656 1657
//      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
//             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
//             pInfo->tqReader->pWalReader->curVersion);
1658
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1659
      return pResult;
1660
    }
1661

1662
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1663
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
1664
    
1665 1666
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
1667
    if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
1668
      return NULL;
1669
    }
1670

wmmhello's avatar
wmmhello 已提交
1671
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1672 1673
  }

1674
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
1675

L
Liu Jicong 已提交
1676
    while (1) {
1677 1678 1679 1680
      bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);

      SSDataBlock* pRes = NULL;
      struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
1681 1682

      // curVersion move to next, so currentOffset = curVersion - 1
1683
      tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion - 1);
1684

1685
      if (hasResult) {
1686
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
X
Xiaoyu Wang 已提交
1687
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1688
        blockDataCleanup(pInfo->pRes);
1689
        setBlockIntoRes(pInfo, pRes, true);
L
Liu Jicong 已提交
1690 1691 1692
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
1693
      } else {
wmmhello's avatar
wmmhello 已提交
1694
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1695 1696 1697
        return NULL;
      }
    }
L
Liu Jicong 已提交
1698
  } else {
1699
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1700
    return NULL;
H
Haojun Liao 已提交
1701
  }
L
Liu Jicong 已提交
1702 1703
}

L
Liu Jicong 已提交
1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
1719 1720

  int32_t j = 0;
L
Liu Jicong 已提交
1721
  for (int32_t i = 0; i < rows; i++) {
1722
    if (pInfo->readerFn.tqReaderIsQueriedTable(pReader, uidCol[i])) {
1723 1724 1725
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1726

1727 1728 1729
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1730 1731 1732
      j++;
    }
  }
1733

L
Liu Jicong 已提交
1734
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1735 1736
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1737
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1738 1739 1740 1741

  return 0;
}

5
54liuyao 已提交
1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1754
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1755 1756 1757 1758
    }
  }
}

5
54liuyao 已提交
1759
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
L
liuyao 已提交
1760
  if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
1761
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1762
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1774 1775 1776 1777
    }
  }
}

1778 1779 1780 1781 1782 1783
//int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
//  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
//  *pBuff = taosMemoryCalloc(1, len);
//  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
//  return len;
//}
L
liuyao 已提交
1784 1785

// other properties are recovered from the execution plan
1786
void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
L
fix bug  
liuyao 已提交
1787
  if (!pBuff || len == 0) {
L
liuyao 已提交
1788 1789 1790
    return;
  }

1791 1792
  void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
  int32_t      code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1793 1794 1795 1796 1797
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1798 1799 1800
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
1801 1802
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

L
Liu Jicong 已提交
1803 1804
  SStreamScanInfo* pInfo = pOperator->info;

1805
  qDebug("stream scan started, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
1806

1807 1808
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1809
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1810
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1811
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1812 1813
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
1814
      qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
H
Haojun Liao 已提交
1815
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1816
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1817
    } else {
H
Haojun Liao 已提交
1818 1819
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
1820
      qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64, pTSInfo->base.cond.startVersion,
H
Haojun Liao 已提交
1821
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1822
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1823
    }
L
Liu Jicong 已提交
1824

1825
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1826

H
Haojun Liao 已提交
1827
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1828
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1829

L
Liu Jicong 已提交
1830 1831
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1832
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1833 1834
  }

5
54liuyao 已提交
1835 1836
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1837 1838 1839 1840 1841
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1842 1843 1844 1845 1846 1847 1848

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1849 1850 1851 1852
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1853
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1854 1855
        return pInfo->pUpdateRes;
      } break;
1856 1857 1858 1859 1860 1861 1862 1863 1864
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1865 1866 1867 1868 1869 1870
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1871
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1872 1873 1874 1875 1876 1877
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1878 1879 1880 1881 1882 1883
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1884
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1885
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1886
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1887
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
1888
          TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1889 1890 1891 1892
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1893
      }
5
54liuyao 已提交
1894 1895
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1896
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1897 1898
        return pInfo->pCreateTbRes;
      }
X
Xiaoyu Wang 已提交
1899
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1900 1901
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1902 1903
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1904
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
1905
    pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1906

H
Haojun Liao 已提交
1907
    pTSInfo->base.dataReader = NULL;
1908

H
Haojun Liao 已提交
1909 1910
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1911

L
Liu Jicong 已提交
1912
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1913 1914 1915
    return NULL;
  }

5
54liuyao 已提交
1916
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1917
// TODO: refactor
L
Liu Jicong 已提交
1918
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1919
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1920
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1921
      doClearBufferedBlocks(pInfo);
H
Haojun Liao 已提交
1922 1923 1924
      return NULL;
    }

1925
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1926 1927
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1928
    if (pBlock->info.parTbName[0]) {
1929
      pAPI->stateStore.streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1930
    }
1931

1932
    // TODO move into scan
5
54liuyao 已提交
1933 1934
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1935
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
1936
    if (pInfo->pUpdateInfo) {
1937
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
L
fix bug  
liuyao 已提交
1938
    }
1939
    blockDataUpdateTsWindow(pBlock, 0);
1940
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1941 1942 1943
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1944 1945 1946
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1947
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
1948
        pInfo->updateResIndex = 0;
1949
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1950
        pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
1951 1952
      } break;
      case STREAM_DELETE_DATA: {
1953
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1954
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1955
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1956
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1957
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1958 1959
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1960
        }
5
54liuyao 已提交
1961 1962
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1963 1964 1965 1966 1967 1968
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1969
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1970
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1971
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1972
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1973 1974
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1975 1976 1977 1978 1979
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1980 1981 1982
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1983
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1984 1985 1986
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1987 1988 1989 1990
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1991
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1992
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1993 1994 1995 1996
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1997
        }
1998 1999 2000
      } break;
      default:
        break;
5
54liuyao 已提交
2001
    }
2002
    // printDataBlock(pBlock, "stream scan recv");
2003
    return pBlock;
L
Liu Jicong 已提交
2004
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
2005
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
2006 2007 2008
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
2009
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
2010 2011 2012
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
2013 2014 2015
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2016
      } break;
2017
      case STREAM_SCAN_FROM_DELETE_DATA: {
2018 2019 2020 2021 2022 2023 2024
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2025 2026 2027 2028 2029 2030 2031 2032 2033 2034
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2035
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2036 2037
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2038
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2039
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2040 2041
          return pSDB;
        }
2042
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2043 2044 2045 2046
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2047
    }
2048

2049
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2050
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2051 2052
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2053 2054
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2055
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2056
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2057
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2058
    }
5
54liuyao 已提交
2059

2060 2061 2062
    const char*     id = GET_TASKID(pTaskInfo);
    SSDataBlock*    pBlock = pInfo->pRes;
    SDataBlockInfo* pBlockInfo = &pBlock->info;
H
Haojun Liao 已提交
2063
    int32_t         totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
2064

L
Liu Jicong 已提交
2065
  NEXT_SUBMIT_BLK:
2066
    while (1) {
2067
      if (pInfo->readerFn.tqReaderCurrentBlockConsumed(pInfo->tqReader)) {
2068
        if (pInfo->validBlockIndex >= totalBlocks) {
2069
          pAPI->stateStore.updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
          doClearBufferedBlocks(pInfo);

          qDebug("stream scan return empty, all %d submit blocks consumed, %s", totalBlocks, id);
          return NULL;
        }

        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);

        qDebug("set %d/%d as the input submit block, %s", current, totalBlocks, id);
2080
        if (pAPI->tqReaderFn.tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2081 2082 2083 2084
          qError("submit msg messed up when initializing stream submit block %p, current %d/%d, %s", pSubmit, current, totalBlocks, id);
          continue;
        }
      }
2085

2086
      blockDataCleanup(pBlock);
2087

2088 2089 2090 2091
      while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
        SSDataBlock* pRes = NULL;
        int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
        if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
2092 2093 2094
          continue;
        }

2095
        setBlockIntoRes(pInfo, pRes, false);
2096

5
54liuyao 已提交
2097 2098 2099
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2100 2101
        }

2102 2103 2104 2105
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
        doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
        pBlock->info.dataLoad = 1;
        blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
2106

2107 2108 2109
        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
          break;
        }
2110
      }
H
Haojun Liao 已提交
2111

2112
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2113
        break;
2114 2115
      } else {
        continue;
5
54liuyao 已提交
2116
      }
H
Haojun Liao 已提交
2117 2118 2119 2120
    }

    // record the scan action.
    pInfo->numOfExec++;
2121
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
H
Haojun Liao 已提交
2122

2123
    qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
L
Liu Jicong 已提交
2124
    if (pBlockInfo->rows > 0) {
2125
      return pBlock;
L
Liu Jicong 已提交
2126
    }
2127 2128 2129 2130 2131 2132

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
H
Haojun Liao 已提交
2133
  }
2134

2135
  return NULL;
H
Haojun Liao 已提交
2136 2137
}

H
Haojun Liao 已提交
2138
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2139 2140 2141
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2142 2143 2144
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2145 2146 2147 2148 2149 2150
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2151
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2152
  // NOTE: this operator does never check if current status is done or not
2153 2154 2155
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI*   pAPI = &pTaskInfo->storageAPI;

2156
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2157
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2158
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2159
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2160

wmmhello's avatar
wmmhello 已提交
2161
  qDebug("tmqsnap doRawScan called");
2162
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2163 2164
    bool hasNext = false;
    if (pInfo->dataReader) {
2165
      code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
D
dapan1121 已提交
2166
      if (code) {
2167
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2168
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2169 2170
      }
    }
X
Xiaoyu Wang 已提交
2171

D
dapan1121 已提交
2172
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2173
      if (isTaskKilled(pTaskInfo)) {
2174
        pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
2175
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2176
      }
2177

2178
      SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL);
H
Haojun Liao 已提交
2179
      if (pBlock == NULL) {
2180
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2181 2182
      }

H
Haojun Liao 已提交
2183
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2184
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2185 2186
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2187

2188
    SMetaTableInfo mtInfo = pAPI->snapshotFn.getTableInfoFromSnapshot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2189
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2190
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2191
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2192
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2193
    } else {
2194
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2195
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2196
    }
2197
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2198
    tDeleteSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2199
    return NULL;
2200
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2201 2202 2203 2204 2205
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
2206
    if (pAPI->snapshotFn.getMetaInfoFromSnapshot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2207
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2208
      taosMemoryFreeClear(data);
2209 2210 2211
      return NULL;
    }

2212
    if (!sContext->queryMeta) {  // change to get data next poll request
2213 2214 2215
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2216
    } else {
2217
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2218 2219 2220 2221
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2222

wmmhello's avatar
wmmhello 已提交
2223
    return NULL;
2224 2225 2226 2227
  }
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2228
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2229
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
2230
  pRawScan->pAPI->tsdReader.tsdReaderClose(pRawScan->dataReader);
2231
  pRawScan->pAPI->snapshotFn.destroySnapshot(pRawScan->sContext);
2232
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2233 2234 2235
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2236 2237 2238
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2239
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2240 2241 2242 2243 2244
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2245 2246
  int32_t code = TSDB_CODE_SUCCESS;

2247
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2248
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2249
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2250 2251
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2252 2253
  }

2254
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2255 2256
  pInfo->vnode = pHandle->vnode;

2257
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2258 2259
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2260

2261
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2262
  return pOperator;
H
Haojun Liao 已提交
2263

L
Liu Jicong 已提交
2264
_end:
H
Haojun Liao 已提交
2265 2266 2267 2268
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2269 2270
}

2271
static void destroyStreamScanOperatorInfo(void* param) {
2272
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2273

2274
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
2275
    destroyOperator(pStreamScan->pTableScanOp);
2276
  }
2277

2278
  if (pStreamScan->tqReader) {
2279
    pStreamScan->readerFn.tqReaderClose(pStreamScan->tqReader);
2280
  }
H
Haojun Liao 已提交
2281 2282
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2283
  }
C
Cary Xu 已提交
2284 2285
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2286
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2287
  }
C
Cary Xu 已提交
2288

L
Liu Jicong 已提交
2289
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2290
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2291

2292
  pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
2293 2294 2295 2296
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2297
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2298
  blockDataDestroy(pStreamScan->pCreateTbRes);
2299 2300 2301 2302
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2303
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2304
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2305
  SArray*          pColIds = NULL;
2306 2307
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2308
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2309

H
Haojun Liao 已提交
2310
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2311
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2312
    tableListDestroy(pTableListInfo);
2313
    goto _error;
H
Haojun Liao 已提交
2314 2315
  }

2316
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2317
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2318

2319
  pInfo->pTagCond = pTagCond;
2320
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2321

2322
  int32_t numOfCols = 0;
2323 2324
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2325
  if (code != TSDB_CODE_SUCCESS) {
2326
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2327 2328
    goto _error;
  }
2329

H
Haojun Liao 已提交
2330
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2331
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2332
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2333
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2334 2335

    int16_t colId = id->colId;
2336
    taosArrayPush(pColIds, &colId);
2337
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2338
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2339
    }
H
Haojun Liao 已提交
2340 2341
  }

L
Liu Jicong 已提交
2342 2343 2344 2345
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2346
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2347 2348
      goto _error;
    }
2349

L
Liu Jicong 已提交
2350 2351 2352
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2353
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2354 2355 2356 2357
      goto _error;
    }
  }

2358 2359
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2360
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2361 2362
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2363
      tableListDestroy(pTableListInfo);
2364 2365 2366 2367
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2368
      tableListDestroy(pTableListInfo);
2369 2370 2371 2372
      goto _error;
    }
  }

L
Liu Jicong 已提交
2373
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2374
  if (pInfo->pBlockLists == NULL) {
2375
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2376
    tableListDestroy(pTableListInfo);
2377
    goto _error;
H
Haojun Liao 已提交
2378 2379
  }

5
54liuyao 已提交
2380
  if (pHandle->vnode) {
2381
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2382
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2383
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2384
      pTSInfo->base.cond.endVersion = pHandle->version;
2385
    }
L
Liu Jicong 已提交
2386

2387
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2388
    int32_t        num = 0;
2389
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2390

2391
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2392
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2393
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2394 2395
    }

L
Liu Jicong 已提交
2396 2397
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
2398
      pInfo->tqReader = pAPI->tqReaderFn.tqReaderOpen(pHandle->vnode);
L
Liu Jicong 已提交
2399
      ASSERT(pInfo->tqReader);
2400
    } else {
L
Liu Jicong 已提交
2401 2402
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2403 2404
    }

2405
    pInfo->pUpdateInfo = NULL;
2406
    pInfo->pTableScanOp = pTableScanOp;
2407
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
2408
      pAPI->stateStore.streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
2409
    }
L
Liu Jicong 已提交
2410

L
Liu Jicong 已提交
2411
    pInfo->readHandle = *pHandle;
L
Liu Jicong 已提交
2412
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2413 2414
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2415

L
Liu Jicong 已提交
2416
    // set the extract column id to streamHandle
2417
    pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
2418
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2419
    code = pAPI->tqReaderFn.tqReaderSetQueryTableList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2420 2421 2422 2423
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2424

L
Liu Jicong 已提交
2425
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2426
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2427 2428
  } else {
    taosArrayDestroy(pColIds);
2429
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2430
    pColIds = NULL;
5
54liuyao 已提交
2431 2432
  }

2433 2434 2435 2436 2437
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2438 2439 2440 2441 2442
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2443
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2444
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2445
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2446
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2447
  pInfo->groupId = 0;
2448
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2449
  pInfo->pStreamScanOp = pOperator;
2450
  pInfo->deleteDataIndex = 0;
2451
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2452
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2453
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2454
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2455
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2456 2457
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2458
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2459
  pInfo->pState = NULL;
2460 2461
  pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
  pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
L
Liu Jicong 已提交
2462

L
fix bug  
liuyao 已提交
2463 2464 2465 2466
  // for stream
  if (pTaskInfo->streamInfo.pState) {
    void*   buff = NULL;
    int32_t len = 0;
2467 2468
    pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len);
    streamScanOperatorDecode(buff, len, pInfo);
L
fix bug  
liuyao 已提交
2469
  }
L
liuyao 已提交
2470

L
fix bug  
liuyao 已提交
2471
  setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
L
Liu Jicong 已提交
2472
                  pTaskInfo);
2473
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2474

2475
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2476 2477
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2478

H
Haojun Liao 已提交
2479
  return pOperator;
2480

L
Liu Jicong 已提交
2481
_error:
H
Haojun Liao 已提交
2482 2483 2484 2485 2486 2487 2488 2489
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2490 2491
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2492 2493
}

2494
static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, SStorageAPI* pAPI) {
2495 2496 2497 2498
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  STagScanInfo* pInfo = pOperator->info;
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];

2499
  STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
2500
  int32_t        code = pAPI->metaReaderFn.getTableEntryByUid(mr, item->uid);
2501 2502 2503 2504
  tDecoderClear(&(*mr).coder);
  if (code != TSDB_CODE_SUCCESS) {
    qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
           GET_TASKID(pTaskInfo));
2505
    pAPI->metaReaderFn.clearReader(mr);
2506 2507 2508
    T_LONG_JMP(pTaskInfo->env, terrno);
  }

2509
  char str[512];
2510 2511 2512 2513 2514 2515
  for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
    SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

    // refactor later
    if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
      STR_TO_VARSTR(str, (*mr).me.name);
2516
      colDataSetVal(pDst, (count), str, false);
2517 2518 2519
    } else {  // it is a tag value
      STagVal val = {0};
      val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2520
      const char* p = pAPI->metaFn.extractTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val);
2521 2522 2523 2524 2525 2526 2527

      char* data = NULL;
      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
        data = (char*)p;
      }
2528
      colDataSetVal(pDst, (count), data,
2529 2530 2531 2532 2533 2534 2535 2536 2537 2538
                    (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));

      if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
          data != NULL) {
        taosMemoryFree(data);
      }
    }
  }
}

2539
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2540 2541 2542 2543
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2544
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2545
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2546 2547

  STagScanInfo* pInfo = pOperator->info;
2548
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2549
  SSDataBlock*  pRes = pInfo->pRes;
2550
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2551

2552
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2553
  if (size == 0) {
H
Haojun Liao 已提交
2554 2555 2556 2557
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2558 2559 2560
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2561
  pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, 0);
H
Haojun Liao 已提交
2562

wmmhello's avatar
wmmhello 已提交
2563
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2564
    doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
2565
    ++count;
wmmhello's avatar
wmmhello 已提交
2566
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2567
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2568
    }
2569
    // each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
2570
    if (pInfo->pSlimit != NULL) {
2571 2572 2573
      if (pInfo->curPos < pInfo->pSlimit->offset) {
        continue;
      }
2574
      pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
2575 2576 2577
      if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
        setOperatorCompleted(pOperator);
      }
2578
      break;
H
Haojun Liao 已提交
2579 2580 2581
    }
  }

2582
  pAPI->metaReaderFn.clearReader(&mr);
2583

2584
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2585
  if (pOperator->status == OP_EXEC_DONE) {
2586
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2587 2588 2589
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2590
  pOperator->resultInfo.totalRows += count;
2591

2592
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2593 2594
}

2595
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2596 2597
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2598
  taosArrayDestroy(pInfo->matchInfo.pList);
2599
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2600
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2601 2602
}

S
slzhou 已提交
2603
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2604
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2605
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2606 2607 2608 2609 2610
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2611 2612 2613 2614
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2615
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2616 2617 2618
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2619

H
Haojun Liao 已提交
2620 2621
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2622 2623 2624
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2625

2626
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2627
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2628 2629
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2630
  pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
2631

L
Liu Jicong 已提交
2632 2633
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2634
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2635 2636
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2637 2638
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2639 2640

  return pOperator;
2641

2642
_error:
H
Haojun Liao 已提交
2643 2644 2645 2646 2647
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2648

dengyihao's avatar
dengyihao 已提交
2649
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2650 2651 2652 2653
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
2654 2655
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

dengyihao's avatar
opt mem  
dengyihao 已提交
2656 2657
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2658
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2659

H
Haojun Liao 已提交
2660
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2661

L
Liu Jicong 已提交
2662
  int64_t      st = taosGetTimestampUs();
2663
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2664
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2665

D
dapan1121 已提交
2666
  if (NULL == source->dataReader || !source->multiReader) {
2667
    code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
D
dapan1121 已提交
2668 2669 2670
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2671
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2672

D
dapan1121 已提交
2673
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2674
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2675
  bool         hasNext = false;
2676
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2677 2678

  while (true) {
2679
    code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
D
dapan1121 已提交
2680
    if (code != 0) {
2681
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
D
dapan1121 已提交
2682 2683 2684 2685 2686 2687 2688
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2689

H
Haojun Liao 已提交
2690
    if (isTaskKilled(pTaskInfo)) {
2691
      pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
D
dapan1121 已提交
2692
      pInfo->base.dataReader = NULL;
2693
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2694 2695 2696
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2697
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2698 2699 2700 2701
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2702
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2703 2704 2705 2706
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2707 2708

    uint32_t status = 0;
2709
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2710
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2711
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2712
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2713 2714
    }

2715 2716 2717 2718
    if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
      break;
    }

dengyihao's avatar
opt mem  
dengyihao 已提交
2719 2720 2721 2722 2723
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2724
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2725

H
Haojun Liao 已提交
2726
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2727
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2728

2729
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2730
    if (!source->multiReader) {
2731
      pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
D
dapan1121 已提交
2732 2733
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2734
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2735 2736
    return pBlock;
  }
H
Haojun Liao 已提交
2737

D
dapan1121 已提交
2738
  if (!source->multiReader) {
2739
    pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
D
dapan1121 已提交
2740 2741
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2742
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2743 2744 2745
  return NULL;
}

2746 2747 2748
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2749
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2750
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2751
      tsTargetSlotId = colInfo->dstSlotId;
2752 2753 2754
    }
  }

2755 2756 2757
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2758
  bi.slotId = tsTargetSlotId;
2759 2760 2761 2762 2763 2764 2765
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2766
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2767 2768 2769 2770 2771 2772 2773
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2774

2775
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2776 2777 2778
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2779
  {
2780
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2781
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2782
    for (; i < numOfTables; ++i) {
2783
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2784 2785 2786 2787 2788 2789
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2790

S
slzhou 已提交
2791 2792
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2793

H
Haojun Liao 已提交
2794
  pInfo->base.dataReader = NULL;
2795

2796 2797
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2798
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2799
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2800 2801
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2802

dengyihao's avatar
dengyihao 已提交
2803
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2804 2805 2806 2807 2808 2809

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2810 2811 2812
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2813
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2814
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2815 2816
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2817
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2818 2819

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2820
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2821
    taosArrayPush(pInfo->queryConds, &cond);
2822 2823
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2824
  for (int32_t i = 0; i < numOfTable; ++i) {
2825
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2826
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2827
    ps->param = param;
2828
    ps->onlyRef = true;
2829 2830 2831 2832 2833 2834
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2835
    T_LONG_JMP(pTaskInfo->env, terrno);
2836 2837
  }

2838 2839 2840 2841 2842 2843
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
2844
  SStorageAPI*         pAPI = &pTaskInfo->storageAPI;
2845

dengyihao's avatar
dengyihao 已提交
2846
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2847

2848 2849 2850 2851 2852 2853 2854
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2855
  for (int32_t i = 0; i < numOfTable; ++i) {
2856 2857
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
2858
    pAPI->tsdReader.tsdReaderClose(param->dataReader);
D
dapan1121 已提交
2859
    param->dataReader = NULL;
2860
  }
2861 2862
  taosArrayClear(pInfo->sortSourceParams);

2863
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2864
  pInfo->pSortHandle = NULL;
2865

dengyihao's avatar
opt mem  
dengyihao 已提交
2866 2867 2868
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2869
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2870 2871 2872
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2873
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2874 2875 2876
  return TSDB_CODE_SUCCESS;
}

2877 2878
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2879 2880
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2881 2882 2883
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2884
  blockDataCleanup(pResBlock);
2885 2886

  while (1) {
2887
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2888 2889 2890 2891
    if (pTupleHandle == NULL) {
      break;
    }

2892 2893
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2894 2895 2896 2897
      break;
    }
  }

2898
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2899
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2900
         pInfo->limitInfo.numOfOutputRows);
2901

2902
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2915
    T_LONG_JMP(pTaskInfo->env, code);
2916
  }
2917

2918
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2919 2920
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2921

S
slzhou 已提交
2922
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2923
      setOperatorCompleted(pOperator);
2924 2925
      return NULL;
    }
S
slzhou 已提交
2926
    pInfo->tableStartIndex = 0;
2927
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2928 2929
    startGroupTableMergeScan(pOperator);
  }
2930

S
slzhou 已提交
2931 2932
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2933 2934 2935 2936
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2937 2938
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2939
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2940
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2941 2942 2943
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2944
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2945 2946
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2947
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2948 2949
        break;
      }
2950

S
slzhou 已提交
2951
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2952
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2953
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2954
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2955
    }
wmmhello's avatar
wmmhello 已提交
2956 2957
  }

2958 2959 2960
  return pBlock;
}

2961
void destroyTableMergeScanOperatorInfo(void* param) {
2962
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2963
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2964

dengyihao's avatar
dengyihao 已提交
2965 2966 2967
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2968 2969
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
2970
    pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
D
dapan1121 已提交
2971
    p->dataReader = NULL;
2972
  }
H
Haojun Liao 已提交
2973

2974
  pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
2975 2976
  pTableScanInfo->base.dataReader = NULL;

2977
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2978 2979
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2980

dengyihao's avatar
opt mem  
dengyihao 已提交
2981 2982 2983
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2984 2985
  }

2986
  taosArrayDestroy(pTableScanInfo->queryConds);
2987
  destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
2988 2989 2990 2991 2992

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2993
  taosMemoryFreeClear(param);
2994 2995 2996 2997
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2998 2999
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
3000
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
3001
  execInfo->blockRecorder = pInfo->base.readRecorder;
3002
  execInfo->sortExecInfo = pInfo->sortExecInfo;
3003 3004 3005

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
3006

3007 3008 3009
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3010
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3011
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3012 3013 3014 3015 3016
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3017

3018 3019 3020
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3021
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3022
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3023 3024 3025
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3026

H
Haojun Liao 已提交
3027
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3028
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3029
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3030 3031 3032 3033
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3034
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3035 3036
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3037 3038 3039 3040
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3041 3042 3043 3044 3045 3046
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3047
  pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
H
Haojun Liao 已提交
3048 3049
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3050
  pInfo->base.readHandle = *readHandle;
3051 3052 3053

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3054
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3055

3056
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3057
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3058 3059 3060 3061 3062 3063

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3064
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3065
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3066 3067
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3068
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3069

H
Haojun Liao 已提交
3070
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3071
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3072
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3073

dengyihao's avatar
dengyihao 已提交
3074
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3075 3076
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3077

L
Liu Jicong 已提交
3078 3079
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3080
  pOperator->exprSupp.numOfExprs = numOfCols;
3081

3082 3083
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3084 3085 3086 3087 3088 3089 3090 3091 3092
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3093 3094 3095 3096

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3097
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3098
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3099
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI);
S
slzhou 已提交
3100
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3101
                                                   SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI);
S
slzhou 已提交
3102 3103
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3104 3105
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3106 3107 3108 3109 3110 3111 3112
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3174 3175
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3204

S
slzhou 已提交
3205
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3206 3207 3208
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3209
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3210
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3211
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3212 3213 3214 3215 3216 3217 3218 3219 3220

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3221
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3222 3223
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3224 3225 3226
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3227 3228 3229

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3230 3231
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3243 3244 3245
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3246
    ASSERT(strlen(dbName));
S
slzhou 已提交
3247
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3248 3249 3250 3251

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3252
    varDataSetLen(varDbName, strlen(dbName));
3253
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3254 3255 3256 3257
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3258
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3259
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3260
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3261
      varDataSetLen(varStbName, strlen(stbName));
3262
      colDataSetVal(colInfoData, 0, varStbName, false);
3263
    } else {
3264
      colDataSetNULL(colInfoData, 0);
3265
    }
S
slzhou 已提交
3266 3267 3268
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3269
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3270
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3271 3272 3273 3274
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3275
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3276 3277 3278
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3279
  size_t infodbTableNum;
S
slzhou 已提交
3280
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3281
  size_t perfdbTableNum;
S
slzhou 已提交
3282 3283
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3284
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3285
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3286 3287
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3288
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3289 3290 3291 3292
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3309 3310 3311 3312 3313 3314
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3315

S
slzhou 已提交
3316 3317 3318
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3319 3320 3321 3322 3323 3324 3325
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3326 3327 3328 3329 3330 3331 3332 3333
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3334
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3335 3336 3337 3338
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3339
  blockDataCleanup(pRes);
3340

S
slzhou 已提交
3341 3342 3343
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3344
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3345
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3346
  }
S
slzhou 已提交
3347

S
slzhou 已提交
3348 3349 3350 3351 3352
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3353 3354
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3355
  char        dbName[TSDB_DB_NAME_LEN] = {0};
3356 3357
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;
S
slzhou 已提交
3358

S
slzhou 已提交
3359
  // get dbname
3360
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL);
S
slzhou 已提交
3361 3362 3363 3364
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3365
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3366 3367 3368 3369 3370 3371 3372 3373 3374
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
3375 3376 3377
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3378 3379 3380
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
3381
      if (pAPI->metaFn.storeGetTableList(pInfo->readHandle.vnode, 0, pInfo->stbUidList, TSDB_SUPER_TABLE) < 0) {
S
slzhou 已提交
3382
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3383
      }
S
slzhou 已提交
3384 3385 3386
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
3387
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI);
S
slzhou 已提交
3388 3389 3390

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
3391
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName, pAPI);
S
slzhou 已提交
3392 3393

      pInfo->currGrpIdx++;
S
slzhou 已提交
3394
    } else {
S
slzhou 已提交
3395
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3396 3397
    }
  } else {
S
slzhou 已提交
3398 3399
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
3400 3401

    int64_t dbTableCount = 0;
3402
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &dbTableCount, NULL);
S
slzhou 已提交
3403 3404 3405 3406 3407 3408 3409
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
3410 3411 3412
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
  SStorageAPI* pAPI = &pTaskInfo->storageAPI;

S
slzhou 已提交
3413 3414
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
3415
      tb_uid_t uid = 0;
3416
      pAPI->metaFn.getTableUidByName(pInfo->readHandle.vnode, pSupp->stbNameFilter, &uid);
3417 3418 3419 3420 3421

      int64_t numOfChildTables = 0;
      pAPI->metaFn.getNumOfChildTables(pInfo->readHandle.vnode, uid, &numOfChildTables);

      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, numOfChildTables, pRes);
S
slzhou 已提交
3422
    } else {
3423
      int64_t tbNumVnode = 0;//metaGetTbNum(pInfo->readHandle.vnode);
S
slzhou 已提交
3424
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3425
    }
S
slzhou 已提交
3426
  } else {
3427 3428
    int64_t tbNumVnode = 0;
    pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, &tbNumVnode, NULL);
S
slzhou 已提交
3429
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3430
  }
3431

S
slzhou 已提交
3432 3433 3434 3435
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3436
                                           SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI) {
S
slzhou 已提交
3437
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3438 3439 3440
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3441

S
slzhou 已提交
3442 3443
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
3444 3445 3446 3447 3448

  int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode);
  pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables);
  if (numOfTables != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes);
3449
  }
S
slzhou 已提交
3450 3451 3452
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
3453
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI) {
S
slzhou 已提交
3454
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
3455
  pAPI->metaFn.getTableNameByUid(pInfo->readHandle.vnode, stbUid, stbName);
S
slzhou 已提交
3456 3457

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3458 3459 3460 3461 3462
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
X
Xiaoyu Wang 已提交
3463

S
slzhou 已提交
3464 3465 3466 3467
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
3468
//  metaGetStbStats(pInfo->readHandle.vnode, stbUid, &stats);
S
slzhou 已提交
3469 3470 3471
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3472 3473 3474
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3475
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3476 3477
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3478
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3479 3480
  taosMemoryFreeClear(param);
}