scanoperator.c 130.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#include "executorimpl.h"
H
Haojun Liao 已提交
17
#include "filter.h"
18
#include "function.h"
19
#include "functionMgt.h"
L
Liu Jicong 已提交
20
#include "os.h"
H
Haojun Liao 已提交
21
#include "querynodes.h"
22
#include "systable.h"
H
Haojun Liao 已提交
23
#include "tname.h"
24
#include "ttime.h"
H
Haojun Liao 已提交
25 26 27 28 29 30 31 32 33

#include "tdatablock.h"
#include "tmsg.h"

#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"

D
dapan1121 已提交
34 35
int32_t scanDebug = 0;

X
Xiaoyu Wang 已提交
36
#define MULTI_READER_MAX_TABLE_NUM   5000
H
Haojun Liao 已提交
37
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
38
#define SWITCH_ORDER(n)              (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
39

H
Haojun Liao 已提交
40 41 42 43 44 45 46 47 48 49
typedef struct STableMergeScanExecInfo {
  SFileBlockLoadRecorder blockRecorder;
  SSortExecInfo          sortExecInfo;
} STableMergeScanExecInfo;

typedef struct STableMergeScanSortSourceParam {
  SOperatorInfo* pOperator;
  int32_t        readerIdx;
  uint64_t       uid;
  SSDataBlock*   inputBlock;
D
dapan1121 已提交
50 51
  bool           multiReader;
  STsdbReader*   dataReader;
H
Haojun Liao 已提交
52 53
} STableMergeScanSortSourceParam;

54 55 56 57 58 59 60 61 62 63
typedef struct STableCountScanOperatorInfo {
  SReadHandle  readHandle;
  SSDataBlock* pRes;

  STableCountScanSupp supp;

  int32_t currGrpIdx;
  SArray* stbUidList;  // when group by db_name and/or stable_name
} STableCountScanOperatorInfo;

L
Liu Jicong 已提交
64
static bool processBlockWithProbability(const SSampleExecInfo* pInfo);
65

H
Haojun Liao 已提交
66
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
67 68 69 70 71 72 73 74 75 76 77 78
#if 0
  if (pInfo->sampleRatio == 1) {
    return true;
  }

  uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
  return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
  return true;
#endif
}

79
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
80 81 82 83 84
  for (int32_t i = 0; i < numOfOutput; ++i) {
    SWITCH_ORDER(pCtx[i].order);
  }
}

85 86 87 88 89 90 91 92 93
static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) {
  int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order);
  if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') {
    tw->skey += pInterval->sliding * factor;
    tw->ekey = tw->skey + pInterval->interval - 1;
    return;
  }

  int64_t key = tw->skey, interval = pInterval->interval;
94
  // convert key to second
95 96 97 98 99 100 101
  key = convertTimePrecision(key, pInterval->precision, TSDB_TIME_PRECISION_MILLI) / 1000;

  if (pInterval->intervalUnit == 'y') {
    interval *= 12;
  }

  struct tm tm;
102
  time_t    t = (time_t)key;
103
  taosLocalTime(&t, &tm, NULL);
104 105 106 107

  int mon = (int)(tm.tm_year * 12 + tm.tm_mon + interval * factor);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
108
  tw->skey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
109 110 111 112

  mon = (int)(mon + interval);
  tm.tm_year = mon / 12;
  tm.tm_mon = mon % 12;
wafwerar's avatar
wafwerar 已提交
113
  tw->ekey = convertTimePrecision((int64_t)taosMktime(&tm) * 1000LL, TSDB_TIME_PRECISION_MILLI, pInterval->precision);
114 115 116 117

  tw->ekey -= 1;
}

118
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
119 120 121 122 123 124 125
  STimeWindow w = {0};

  // 0 by default, which means it is not a interval operator of the upstream operator.
  if (pInterval->interval == 0) {
    return false;
  }

126
  if (order == TSDB_ORDER_ASC) {
127
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey);
128
    ASSERT(w.ekey >= pBlockInfo->window.skey);
129

130
    if (w.ekey < pBlockInfo->window.ekey) {
131 132 133
      return true;
    }

134 135
    while (1) {
      getNextTimeWindow(pInterval, &w, order);
136 137 138 139
      if (w.skey > pBlockInfo->window.ekey) {
        break;
      }

140
      ASSERT(w.ekey > pBlockInfo->window.ekey);
141
      if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
142 143 144 145
        return true;
      }
    }
  } else {
146
    w = getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.ekey);
147
    ASSERT(w.skey <= pBlockInfo->window.ekey);
148

149
    if (w.skey > pBlockInfo->window.skey) {
150 151 152
      return true;
    }

153
    while (1) {
154 155 156 157 158 159
      getNextTimeWindow(pInterval, &w, order);
      if (w.ekey < pBlockInfo->window.skey) {
        break;
      }

      assert(w.skey < pBlockInfo->window.skey);
160
      if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
161 162 163
        return true;
      }
    }
164 165 166 167 168
  }

  return false;
}

169 170 171 172 173 174 175 176 177 178 179
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
  if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
    return NULL;
  }

  int64_t buf[2] = {0};
  SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);

  STableScanInfo* pTableScanInfo = pOperator->info;

S
slzhou 已提交
180 181
  SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pTableScanInfo->base.pdInfo.pAggSup->pResultRowHashTable,
                                                               buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
182 183 184 185 186

  if (p1 == NULL) {
    return NULL;
  }

H
Haojun Liao 已提交
187
  *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
188 189 190
  if (NULL == *pPage) {
    return NULL;
  }
L
Liu Jicong 已提交
191

192 193 194 195 196 197
  return (SResultRow*)((char*)(*pPage) + p1->offset);
}

static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
  STableScanInfo* pTableScanInfo = pOperator->info;

H
Haojun Liao 已提交
198
  if (pTableScanInfo->base.pdInfo.pExprSup == NULL) {
199 200 201
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
202
  SExprSupp* pSup1 = pTableScanInfo->base.pdInfo.pExprSup;
203 204

  SFilePage*  pPage = NULL;
H
Haojun Liao 已提交
205
  SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->id.groupId, &pPage);
206 207 208 209 210 211 212 213 214

  if (pRow == NULL) {
    return TSDB_CODE_SUCCESS;
  }

  bool notLoadBlock = true;
  for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
    int32_t functionId = pSup1->pCtx[i].functionId;

H
Haojun Liao 已提交
215
    SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->base.pdInfo.pExprSup->rowEntryInfoOffset);
216 217 218 219 220 221 222 223 224

    int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
    if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
      notLoadBlock = false;
      break;
    }
  }

  // release buffer pages
H
Haojun Liao 已提交
225
  releaseBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, pPage);
226 227 228 229 230 231 232 233

  if (notLoadBlock) {
    *status = FUNC_DATA_REQUIRED_NOT_LOAD;
  }

  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
234
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
235
                               int32_t numOfRows) {
H
Haojun Liao 已提交
236
  if (pColsAgg == NULL || pFilterInfo == NULL) {
H
Haojun Liao 已提交
237 238 239
    return true;
  }

H
Haojun Liao 已提交
240
  bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
H
Haojun Liao 已提交
241 242 243
  return keep;
}

H
Haojun Liao 已提交
244
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
245
  bool    allColumnsHaveAgg = true;
246
  int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg);
H
Haojun Liao 已提交
247
  if (code != TSDB_CODE_SUCCESS) {
248
    T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
249 250 251 252 253 254 255 256
  }

  if (!allColumnsHaveAgg) {
    return false;
  }
  return true;
}

H
Haojun Liao 已提交
257
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
258
                               int32_t rows) {
H
Haojun Liao 已提交
259 260 261
  if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
    SExprSupp* pSup = &pTableScanInfo->pseudoSup;

262
    int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows,
263
                                          GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache);
H
Haojun Liao 已提交
264
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
H
Haojun Liao 已提交
265
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
H
Haojun Liao 已提交
266 267
      T_LONG_JMP(pTaskInfo->env, code);
    }
H
Haojun Liao 已提交
268 269 270

    // reset the error code.
    terrno = 0;
H
Haojun Liao 已提交
271 272 273
  }
}

274
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
275
  SLimit*     pLimit = &pLimitInfo->limit;
H
Haojun Liao 已提交
276
  const char* id = GET_TASKID(pTaskInfo);
277

278
  if (pLimitInfo->remainOffset > 0) {
279 280
    if (pLimitInfo->remainOffset >= pBlock->info.rows) {
      pLimitInfo->remainOffset -= pBlock->info.rows;
H
Haojun Liao 已提交
281
      blockDataEmpty(pBlock);
H
Haojun Liao 已提交
282
      qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
283
      return false;
284
    } else {
285
      blockDataTrimFirstRows(pBlock, pLimitInfo->remainOffset);
286 287 288 289 290 291
      pLimitInfo->remainOffset = 0;
    }
  }

  if (pLimit->limit != -1 && pLimit->limit <= (pLimitInfo->numOfOutputRows + pBlock->info.rows)) {
    // limit the output rows
292
    int32_t keep = (int32_t)(pLimit->limit - pLimitInfo->numOfOutputRows);
293
    blockDataKeepFirstNRows(pBlock, keep);
294 295

    pLimitInfo->numOfOutputRows += pBlock->info.rows;
H
Haojun Liao 已提交
296
    qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id);
297
    return true;
298
  }
299

300
  pLimitInfo->numOfOutputRows += pBlock->info.rows;
301
  return false;
302 303
}

H
Haojun Liao 已提交
304
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
L
Liu Jicong 已提交
305
                             uint32_t* status) {
S
slzhou 已提交
306
  SExecTaskInfo*          pTaskInfo = pOperator->pTaskInfo;
307
  SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
H
Haojun Liao 已提交
308 309

  pCost->totalBlocks += 1;
310
  pCost->totalRows += pBlock->info.rows;
311

H
Haojun Liao 已提交
312
  bool loadSMA = false;
H
Haojun Liao 已提交
313
  *status = pTableScanInfo->dataBlockLoadFlag;
H
Haojun Liao 已提交
314
  if (pOperator->exprSupp.pFilterInfo != NULL ||
315
      overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
316 317 318 319
    (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
  }

  SDataBlockInfo* pBlockInfo = &pBlock->info;
320
  taosMemoryFreeClear(pBlock->pBlockAgg);
321 322

  if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
X
Xiaoyu Wang 已提交
323
    qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
324
           pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
325
    pCost->filterOutBlocks += 1;
326
    pCost->totalRows += pBlock->info.rows;
327
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
328 329
    return TSDB_CODE_SUCCESS;
  } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
330 331 332
    qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows,
           pBlockInfo->id.uid);
G
Ganlin Zhao 已提交
333
    doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
334
    pCost->skipBlocks += 1;
335
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
336
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
337
  } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
338
    pCost->loadBlockStatis += 1;
L
Liu Jicong 已提交
339
    loadSMA = true;  // mark the operation of load sma;
H
Haojun Liao 已提交
340
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
L
Liu Jicong 已提交
341
    if (success) {  // failed to load the block sma data, data block statistics does not exist, load data block instead
X
Xiaoyu Wang 已提交
342
      qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
343
             pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
G
Ganlin Zhao 已提交
344
      doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
345
      tsdbReleaseDataBlock(pTableScanInfo->dataReader);
346 347
      return TSDB_CODE_SUCCESS;
    } else {
348
      qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
349
      *status = FUNC_DATA_REQUIRED_DATA_LOAD;
350
    }
H
Haojun Liao 已提交
351
  }
352

H
Haojun Liao 已提交
353
  ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
354

H
Haojun Liao 已提交
355
  // try to filter data block according to sma info
H
Haojun Liao 已提交
356
  if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
357 358 359
    bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
    if (success) {
      size_t size = taosArrayGetSize(pBlock->pDataBlock);
H
Haojun Liao 已提交
360
      bool   keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
361
      if (!keep) {
X
Xiaoyu Wang 已提交
362 363
        qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
               GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
364 365 366
        pCost->filterOutBlocks += 1;
        (*status) = FUNC_DATA_REQUIRED_FILTEROUT;

367
        tsdbReleaseDataBlock(pTableScanInfo->dataReader);
368 369
        return TSDB_CODE_SUCCESS;
      }
370
    }
H
Haojun Liao 已提交
371
  }
372

373 374 375
  // free the sma info, since it should not be involved in later computing process.
  taosMemoryFreeClear(pBlock->pBlockAgg);

376
  // try to filter data block according to current results
377 378
  doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
  if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
X
Xiaoyu Wang 已提交
379 380
    qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64,
           GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
381
    pCost->skipBlocks += 1;
382
    tsdbReleaseDataBlock(pTableScanInfo->dataReader);
383
    *status = FUNC_DATA_REQUIRED_FILTEROUT;
384 385 386
    return TSDB_CODE_SUCCESS;
  }

H
Haojun Liao 已提交
387 388
  pCost->totalCheckedRows += pBlock->info.rows;
  pCost->loadBlocks += 1;
389

H
Haojun Liao 已提交
390 391
  SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
  if (p == NULL) {
H
Haojun Liao 已提交
392
    return terrno;
H
Haojun Liao 已提交
393 394
  }

H
Haojun Liao 已提交
395
  ASSERT(p == pBlock);
396
  doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
397

H
Haojun Liao 已提交
398 399
  // restore the previous value
  pCost->totalRows -= pBlock->info.rows;
400

H
Haojun Liao 已提交
401
  if (pOperator->exprSupp.pFilterInfo != NULL) {
402
    int64_t st = taosGetTimestampUs();
H
Haojun Liao 已提交
403
    doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
404

405 406
    double el = (taosGetTimestampUs() - st) / 1000.0;
    pTableScanInfo->readRecorder.filterTime += el;
407

408 409
    if (pBlock->info.rows == 0) {
      pCost->filterOutBlocks += 1;
D
dapan1121 已提交
410
      qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
411 412 413 414
             GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
    } else {
      qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
    }
415 416
  }

417
  bool limitReached = applyLimitOffset(&pTableScanInfo->limitInfo, pBlock, pTaskInfo);
X
Xiaoyu Wang 已提交
418
  if (limitReached) {  // set operator flag is done
419 420
    setOperatorCompleted(pOperator);
  }
421

H
Haojun Liao 已提交
422
  pCost->totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
423 424 425
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
426
static void prepareForDescendingScan(STableScanBase* pTableScanInfo, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
H
Haojun Liao 已提交
427 428 429
  SET_REVERSE_SCAN_FLAG(pTableScanInfo);

  switchCtxOrder(pCtx, numOfOutput);
430
  pTableScanInfo->cond.order = TSDB_ORDER_DESC;
H
Haojun Liao 已提交
431 432
  STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
  TSWAP(pTWindow->skey, pTWindow->ekey);
H
Haojun Liao 已提交
433 434
}

435 436
typedef struct STableCachedVal {
  const char* pName;
437
  STag*       pTags;
438 439
} STableCachedVal;

440 441 442 443 444 445 446 447 448 449 450
static void freeTableCachedVal(void* param) {
  if (param == NULL) {
    return;
  }

  STableCachedVal* pVal = param;
  taosMemoryFree((void*)pVal->pName);
  taosMemoryFree(pVal->pTags);
  taosMemoryFree(pVal);
}

H
Haojun Liao 已提交
451 452
static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) {
  STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal));
453
  pVal->pName = taosStrdup(pMetaReader->me.name);
H
Haojun Liao 已提交
454 455 456 457
  pVal->pTags = NULL;

  // only child table has tag value
  if (pMetaReader->me.type == TSDB_CHILD_TABLE) {
458
    STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags;
H
Haojun Liao 已提交
459 460 461 462 463 464 465
    pVal->pTags = taosMemoryMalloc(pTag->len);
    memcpy(pVal->pTags, pTag, pTag->len);
  }

  return pVal;
}

466 467
// const void *key, size_t keyLen, void *value
static void freeCachedMetaItem(const void* key, size_t keyLen, void* value) { freeTableCachedVal(value); }
468

469 470 471 472 473
static void doSetNullValue(SSDataBlock* pBlock, const SExprInfo* pExpr, int32_t numOfExpr) {
  for (int32_t j = 0; j < numOfExpr; ++j) {
    int32_t dstSlotId = pExpr[j].base.resSchema.slotId;

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
474
    colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
475 476 477
  }
}

478 479
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
                               int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) {
480
  // currently only the tbname pseudo column
481
  if (numOfExpr <= 0) {
H
Haojun Liao 已提交
482
    return TSDB_CODE_SUCCESS;
483 484
  }

485 486
  int32_t code = 0;

487 488 489 490
  // backup the rows
  int32_t backupRows = pBlock->info.rows;
  pBlock->info.rows = rows;

491
  bool            freeReader = false;
492
  STableCachedVal val = {0};
493 494

  SMetaReader mr = {0};
495
  LRUHandle*  h = NULL;
496

497 498 499
  // todo refactor: extract method
  // the handling of the null data should be packed in the extracted method

500
  // 1. check if it is existed in meta cache
501
  if (pCache == NULL) {
502
    metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
503
    code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
504
    if (code != TSDB_CODE_SUCCESS) {
505
      // when encounter the TSDB_CODE_PAR_TABLE_NOT_EXIST error, we proceed.
H
Haojun Liao 已提交
506
      if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
S
slzhou 已提交
507 508
        qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
              pBlock->info.id.uid, tstrerror(terrno), idStr);
509 510 511

        // append null value before return to caller, since the caller will ignore this error code and proceed
        doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
512
      } else {
S
slzhou 已提交
513 514
        qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
               idStr);
H
Haojun Liao 已提交
515
      }
516 517 518 519 520
      metaReaderClear(&mr);
      return terrno;
    }

    metaReaderReleaseLock(&mr);
521

522 523
    val.pName = mr.me.name;
    val.pTags = (STag*)mr.me.ctbEntry.pTags;
524 525

    freeReader = true;
526
  } else {
527 528
    pCache->metaFetch += 1;

H
Haojun Liao 已提交
529
    h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
530 531
    if (h == NULL) {
      metaReaderInit(&mr, pHandle->meta, 0);
H
Haojun Liao 已提交
532
      code = metaGetTableEntryByUidCache(&mr, pBlock->info.id.uid);
533
      if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
534
        if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
535
          qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
H
Haojun Liao 已提交
536
                pBlock->info.id.uid, tstrerror(terrno), idStr);
537 538
          // append null value before return to caller, since the caller will ignore this error code and proceed
          doSetNullValue(pBlock, pExpr, numOfExpr);
H
Haojun Liao 已提交
539
        } else {
H
Haojun Liao 已提交
540
          qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.id.uid, tstrerror(terrno),
541
                 idStr);
H
Haojun Liao 已提交
542
        }
543 544 545 546 547 548
        metaReaderClear(&mr);
        return terrno;
      }

      metaReaderReleaseLock(&mr);

H
Haojun Liao 已提交
549
      STableCachedVal* pVal = createTableCacheVal(&mr);
550

H
Haojun Liao 已提交
551
      val = *pVal;
552
      freeReader = true;
H
Haojun Liao 已提交
553

H
Haojun Liao 已提交
554
      int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.id.uid, sizeof(uint64_t), pVal,
555
                                       sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW);
556 557 558 559 560 561 562 563
      if (ret != TAOS_LRU_STATUS_OK) {
        qError("failed to put meta into lru cache, code:%d, %s", ret, idStr);
        freeTableCachedVal(pVal);
      }
    } else {
      pCache->cacheHit += 1;
      STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
      val = *pVal;
H
Haojun Liao 已提交
564

H
Haojun Liao 已提交
565
      taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
566
    }
H
Haojun Liao 已提交
567

568 569
    qDebug("retrieve table meta from cache:%" PRIu64 ", hit:%" PRIu64 " miss:%" PRIu64 ", %s", pCache->metaFetch,
           pCache->cacheHit, (pCache->metaFetch - pCache->cacheHit), idStr);
H
Haojun Liao 已提交
570
  }
571

572 573
  for (int32_t j = 0; j < numOfExpr; ++j) {
    const SExprInfo* pExpr1 = &pExpr[j];
574
    int32_t          dstSlotId = pExpr1->base.resSchema.slotId;
575 576

    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
D
dapan1121 已提交
577
    colInfoDataCleanup(pColInfoData, pBlock->info.rows);
578

579
    int32_t functionId = pExpr1->pExpr->_function.functionId;
580 581 582

    // this is to handle the tbname
    if (fmIsScanPseudoColumnFunc(functionId)) {
583
      setTbNameColData(pBlock, pColInfoData, functionId, val.pName);
584
    } else {  // these are tags
wmmhello's avatar
wmmhello 已提交
585
      STagVal tagVal = {0};
586 587
      tagVal.cid = pExpr1->base.pParam[0].pCol->colId;
      const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal);
wmmhello's avatar
wmmhello 已提交
588

589 590 591 592
      char* data = NULL;
      if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
        data = tTagValToData((const STagVal*)p, false);
      } else {
wmmhello's avatar
wmmhello 已提交
593
        data = (char*)p;
wmmhello's avatar
wmmhello 已提交
594
      }
595

H
Haojun Liao 已提交
596 597
      bool isNullVal = (data == NULL) || (pColInfoData->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data));
      if (isNullVal) {
598
        colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
H
Haojun Liao 已提交
599
      } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
D
dapan1121 已提交
600
        code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
H
Haojun Liao 已提交
601 602 603
        if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
          taosMemoryFree(data);
        }
D
dapan1121 已提交
604 605 606 607 608 609
        if (code) {
          if (freeReader) {
            metaReaderClear(&mr);
          }
          return code;
        }
L
Liu Jicong 已提交
610
      } else {  // todo opt for json tag
H
Haojun Liao 已提交
611
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
612
          colDataSetVal(pColInfoData, i, data, false);
H
Haojun Liao 已提交
613
        }
614 615 616 617
      }
    }
  }

618 619
  // restore the rows
  pBlock->info.rows = backupRows;
620 621 622 623
  if (freeReader) {
    metaReaderClear(&mr);
  }

H
Haojun Liao 已提交
624
  return TSDB_CODE_SUCCESS;
625 626
}

H
Haojun Liao 已提交
627
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) {
628 629 630
  struct SScalarFuncExecFuncs fpSet = {0};
  fmGetScalarFuncExecFuncs(functionId, &fpSet);

H
Haojun Liao 已提交
631
  size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE;
632
  char   buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
633 634 635
  STR_TO_VARSTR(buf, name)

  SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1);
636

H
Haojun Liao 已提交
637
  colInfoDataEnsureCapacity(&infoData, 1, false);
638
  colDataSetVal(&infoData, 0, buf, false);
639

H
Haojun Liao 已提交
640
  SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData};
641
  SScalarParam param = {.columnData = pColInfoData};
H
Haojun Liao 已提交
642 643 644 645 646 647 648

  if (fpSet.process != NULL) {
    fpSet.process(&srcParam, 1, &param);
  } else {
    qError("failed to get the corresponding callback function, functionId:%d", functionId);
  }

D
dapan1121 已提交
649
  colDataDestroy(&infoData);
650 651
}

652
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
653
  STableScanInfo* pTableScanInfo = pOperator->info;
654
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
655
  SSDataBlock*    pBlock = pTableScanInfo->pResBlock;
D
dapan1121 已提交
656 657
  bool            hasNext = false;
  int32_t         code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
658

659 660
  int64_t st = taosGetTimestampUs();

D
dapan1121 已提交
661 662 663 664 665 666 667 668 669 670
  while (true) {
    code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
    if (code) {
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
671

672
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
673
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
674
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
675
    }
H
Haojun Liao 已提交
676

677
    if (pOperator->status == OP_EXEC_DONE) {
X
Xiaoyu Wang 已提交
678
      tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
679 680 681
      break;
    }

682 683 684 685 686 687
    // process this data block based on the probabilities
    bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
    if (!processThisBlock) {
      continue;
    }

D
dapan1121 已提交
688
    if (pBlock->info.id.uid) {
689
      pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
D
dapan1121 已提交
690
    }
691

692
    uint32_t status = 0;
H
Haojun Liao 已提交
693
    int32_t  code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
694
    if (code != TSDB_CODE_SUCCESS) {
695
      T_LONG_JMP(pTaskInfo->env, code);
696
    }
697

698 699 700
    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
701
    }
702

H
Haojun Liao 已提交
703 704
    pOperator->resultInfo.totalRows = pTableScanInfo->base.readRecorder.totalRows;
    pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
705

H
Haojun Liao 已提交
706
    pOperator->cost.totalCost = pTableScanInfo->base.readRecorder.elapsedTime;
707 708

    // todo refactor
H
Haojun Liao 已提交
709
    /*pTableScanInfo->lastStatus.uid = pBlock->info.id.uid;*/
L
Liu Jicong 已提交
710
    /*pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;*/
X
Xiaoyu Wang 已提交
711 712 713
    //    pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
    //    pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
    //    pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
714

715
    return pBlock;
H
Haojun Liao 已提交
716 717 718 719
  }
  return NULL;
}

H
Haojun Liao 已提交
720
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
721 722 723 724
  STableScanInfo* pTableScanInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

  // The read handle is not initialized yet, since no qualified tables exists
H
Haojun Liao 已提交
725
  if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
H
Haojun Liao 已提交
726 727 728
    return NULL;
  }

729 730
  // do the ascending order traverse in the first place.
  while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
H
Haojun Liao 已提交
731 732 733
    SSDataBlock* p = doTableScanImpl(pOperator);
    if (p != NULL) {
      return p;
H
Haojun Liao 已提交
734 735
    }

736
    pTableScanInfo->scanTimes += 1;
737

738
    if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
739
      setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
740 741
      pTableScanInfo->base.scanFlag = MAIN_SCAN;
      pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
742
      qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
743

744
      // do prepare for the next round table scan operation
H
Haojun Liao 已提交
745
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
H
Haojun Liao 已提交
746
    }
747
  }
H
Haojun Liao 已提交
748

749
  int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
750
  if (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
751 752 753
    if (pTableScanInfo->base.cond.order == TSDB_ORDER_ASC) {
      prepareForDescendingScan(&pTableScanInfo->base, pOperator->exprSupp.pCtx, 0);
      tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
754
      qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
755
    }
H
Haojun Liao 已提交
756

757
    while (pTableScanInfo->scanTimes < total) {
H
Haojun Liao 已提交
758 759 760
      SSDataBlock* p = doTableScanImpl(pOperator);
      if (p != NULL) {
        return p;
761
      }
H
Haojun Liao 已提交
762

763
      pTableScanInfo->scanTimes += 1;
H
Haojun Liao 已提交
764

765
      if (pTableScanInfo->scanTimes < total) {
766
        setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
G
Ganlin Zhao 已提交
767
        pTableScanInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
768

769
        qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
770
        tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
771
      }
H
Haojun Liao 已提交
772 773 774
    }
  }

wmmhello's avatar
wmmhello 已提交
775 776 777 778 779 780 781
  return NULL;
}

static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
  STableScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;

782
  // scan table one by one sequentially
L
Liu Jicong 已提交
783
  if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
X
Xiaoyu Wang 已提交
784
    int32_t       numOfTables = 0;  // tableListGetSize(pTaskInfo->pTableListInfo);
785
    STableKeyInfo tInfo = {0};
H
Haojun Liao 已提交
786

L
Liu Jicong 已提交
787
    while (1) {
H
Haojun Liao 已提交
788
      SSDataBlock* result = doGroupedTableScan(pOperator);
H
Haojun Liao 已提交
789
      if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
L
Liu Jicong 已提交
790 791
        return result;
      }
H
Haojun Liao 已提交
792

L
Liu Jicong 已提交
793 794
      // if no data, switch to next table and continue scan
      pInfo->currentTable++;
795 796

      taosRLockLatch(&pTaskInfo->lock);
797
      numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
798

H
Haojun Liao 已提交
799
      if (pInfo->currentTable >= numOfTables) {
H
Haojun Liao 已提交
800
        qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
801
        taosRUnLockLatch(&pTaskInfo->lock);
L
Liu Jicong 已提交
802 803
        return NULL;
      }
H
Haojun Liao 已提交
804

X
Xiaoyu Wang 已提交
805
      tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable);
806 807 808 809
      taosRUnLockLatch(&pTaskInfo->lock);

      tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
      qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
H
Haojun Liao 已提交
810
             pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
811

H
Haojun Liao 已提交
812
      tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
L
Liu Jicong 已提交
813 814
      pInfo->scanTimes = 0;
    }
815 816
  } else {  // scan table group by group sequentially
    if (pInfo->currentGroupId == -1) {
817
      if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
818
        setOperatorCompleted(pOperator);
819 820
        return NULL;
      }
821

5
54liuyao 已提交
822
      int32_t        num = 0;
823
      STableKeyInfo* pList = NULL;
824
      tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
H
Haojun Liao 已提交
825
      ASSERT(pInfo->base.dataReader == NULL);
826

L
Liu Jicong 已提交
827
      int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
D
dapan1121 已提交
828
                                    (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
829 830 831
      if (code != TSDB_CODE_SUCCESS) {
        T_LONG_JMP(pTaskInfo->env, code);
      }
832 833 834 835

      if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
        pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
      }
wmmhello's avatar
wmmhello 已提交
836
    }
H
Haojun Liao 已提交
837

H
Haojun Liao 已提交
838
    SSDataBlock* result = doGroupedTableScan(pOperator);
839 840 841
    if (result != NULL) {
      return result;
    }
H
Haojun Liao 已提交
842

843
    if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
H
Haojun Liao 已提交
844
      setOperatorCompleted(pOperator);
845 846
      return NULL;
    }
wmmhello's avatar
wmmhello 已提交
847

848 849
    // reset value for the next group data output
    pOperator->status = OP_OPENED;
850
    resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
wmmhello's avatar
wmmhello 已提交
851

5
54liuyao 已提交
852
    int32_t        num = 0;
853
    STableKeyInfo* pList = NULL;
854
    tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
wmmhello's avatar
wmmhello 已提交
855

H
Haojun Liao 已提交
856 857
    tsdbSetTableList(pInfo->base.dataReader, pList, num);
    tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
858
    pInfo->scanTimes = 0;
wmmhello's avatar
wmmhello 已提交
859

H
Haojun Liao 已提交
860
    result = doGroupedTableScan(pOperator);
861 862 863
    if (result != NULL) {
      return result;
    }
864

H
Haojun Liao 已提交
865
    setOperatorCompleted(pOperator);
866 867
    return NULL;
  }
H
Haojun Liao 已提交
868 869
}

870 871
static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder));
872
  STableScanInfo*         pTableScanInfo = pOptr->info;
H
Haojun Liao 已提交
873
  *pRecorder = pTableScanInfo->base.readRecorder;
874 875 876 877 878
  *pOptrExplain = pRecorder;
  *len = sizeof(SFileBlockLoadRecorder);
  return 0;
}

879 880
static void destroyTableScanBase(STableScanBase* pBase) {
  cleanupQueryTableDataCond(&pBase->cond);
H
Haojun Liao 已提交
881

882 883
  tsdbReaderClose(pBase->dataReader);
  pBase->dataReader = NULL;
884

885 886
  if (pBase->matchInfo.pList != NULL) {
    taosArrayDestroy(pBase->matchInfo.pList);
887
  }
L
Liu Jicong 已提交
888

889
  tableListDestroy(pBase->pTableListInfo);
890 891 892 893 894 895 896 897
  taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache);
  cleanupExprSupp(&pBase->pseudoSup);
}

static void destroyTableScanOperatorInfo(void* param) {
  STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
  blockDataDestroy(pTableScanInfo->pResBlock);
  destroyTableScanBase(&pTableScanInfo->base);
D
dapan1121 已提交
898
  taosMemoryFreeClear(param);
899 900
}

901
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
902
                                           STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
X
Xiaoyu Wang 已提交
903
  int32_t         code = 0;
H
Haojun Liao 已提交
904 905 906
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
907
    code = TSDB_CODE_OUT_OF_MEMORY;
908
    goto _error;
H
Haojun Liao 已提交
909 910
  }

911
  SScanPhysiNode*     pScanNode = &pTableScanNode->scan;
H
Haojun Liao 已提交
912
  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
913 914

  int32_t numOfCols = 0;
X
Xiaoyu Wang 已提交
915
  code =
H
Haojun Liao 已提交
916
      extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->base.matchInfo);
917 918 919 920
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
921
  initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->base.limitInfo);
H
Haojun Liao 已提交
922
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
923
  if (code != TSDB_CODE_SUCCESS) {
924
    goto _error;
925 926
  }

H
Haojun Liao 已提交
927
  if (pScanNode->pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
928
    SExprSupp* pSup = &pInfo->base.pseudoSup;
H
Haojun Liao 已提交
929
    pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs);
930
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
931 932
  }

933
  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
G
Ganlin Zhao 已提交
934
  pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN;
H
Haojun Liao 已提交
935

H
Haojun Liao 已提交
936 937
  pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
  pInfo->base.readHandle = *readHandle;
H
Haojun Liao 已提交
938 939
  pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;

940 941
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
  pInfo->sample.seed = taosGetTimestampSec();
942

H
Haojun Liao 已提交
943
  initResultSizeInfo(&pOperator->resultInfo, 4096);
H
Haojun Liao 已提交
944
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
X
Xiaoyu Wang 已提交
945
  //  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
H
Haojun Liao 已提交
946

H
Haojun Liao 已提交
947 948 949
  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
H
Haojun Liao 已提交
950 951
  }

wmmhello's avatar
wmmhello 已提交
952
  pInfo->currentGroupId = -1;
953
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
954
  pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
955

L
Liu Jicong 已提交
956 957
  setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
958
  pOperator->exprSupp.numOfExprs = numOfCols;
959

960
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
961 962
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
963 964 965
    code = terrno;
    goto _error;
  }
966

D
dapan1121 已提交
967 968 969 970
  if (scanDebug) {
    pInfo->countOnly = true;
  }

H
Haojun Liao 已提交
971
  taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
972 973
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
                                         optrDefaultBufFn, getTableScannerExecInfo);
974 975 976

  // for non-blocking operator, the open cost is always 0
  pOperator->cost.openCost = 0;
H
Haojun Liao 已提交
977
  return pOperator;
978

979
_error:
980 981 982
  if (pInfo != NULL) {
    destroyTableScanOperatorInfo(pInfo);
  }
983

984 985
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
986
  return NULL;
H
Haojun Liao 已提交
987 988
}

989
SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
990
  STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
L
Liu Jicong 已提交
991
  SOperatorInfo*  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
H
Haojun Liao 已提交
992

H
Haojun Liao 已提交
993
  pInfo->base.dataReader = pReadHandle;
L
Liu Jicong 已提交
994
  //  pInfo->prevGroupId       = -1;
H
Haojun Liao 已提交
995

L
Liu Jicong 已提交
996 997
  setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
998
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
999 1000 1001
  return pOperator;
}

1002
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
5
54liuyao 已提交
1003
  qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
L
Liu Jicong 已提交
1004 1005
  taosArrayClear(pInfo->pBlockLists);
  pInfo->validBlockIndex = 0;
H
Haojun Liao 已提交
1006 1007
}

1008
static bool isSessionWindow(SStreamScanInfo* pInfo) {
H
Haojun Liao 已提交
1009
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
5
54liuyao 已提交
1010 1011
}

1012
static bool isStateWindow(SStreamScanInfo* pInfo) {
1013
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
5
54liuyao 已提交
1014
}
5
54liuyao 已提交
1015

L
Liu Jicong 已提交
1016
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
1017 1018 1019
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
         pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
5
54liuyao 已提交
1020 1021 1022
}

static bool isSignleIntervalWindow(SStreamScanInfo* pInfo) {
1023
  return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
L
Liu Jicong 已提交
1024 1025
}

1026 1027 1028 1029
static bool isSlidingWindow(SStreamScanInfo* pInfo) {
  return isIntervalWindow(pInfo) && pInfo->interval.interval != pInfo->interval.sliding;
}

1030
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
1031 1032
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
  uint64_t*        groupCol = (uint64_t*)pColInfo->pData;
1033
  ASSERT(rowIndex < pBlock->info.rows);
1034
  pInfo->groupId = groupCol[rowIndex];
1035 1036
}

L
fix bug  
liuyao 已提交
1037
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) {
H
Haojun Liao 已提交
1038
  pTableScanInfo->base.cond.twindows = *pWin;
L
fix bug  
liuyao 已提交
1039
  pTableScanInfo->base.cond.endVersion = version;
L
Liu Jicong 已提交
1040 1041
  pTableScanInfo->scanTimes = 0;
  pTableScanInfo->currentGroupId = -1;
H
Haojun Liao 已提交
1042
  tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1043
  qDebug("1");
H
Haojun Liao 已提交
1044
  pTableScanInfo->base.dataReader = NULL;
1045 1046
}

L
Liu Jicong 已提交
1047 1048
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
                                       int64_t maxVersion) {
1049
  STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0};
1050

1051
  STableScanInfo*     pTableScanInfo = pTableScanOp->info;
H
Haojun Liao 已提交
1052
  SQueryTableDataCond cond = pTableScanInfo->base.cond;
1053 1054 1055 1056 1057 1058 1059 1060 1061

  cond.startVersion = -1;
  cond.endVersion = maxVersion;
  cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs};

  SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;

  SSDataBlock* pBlock = pTableScanInfo->pResBlock;
  STsdbReader* pReader = NULL;
L
Liu Jicong 已提交
1062
  int32_t      code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
D
dapan1121 已提交
1063
                                     (STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
1064 1065
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1066
    T_LONG_JMP(pTaskInfo->env, code);
1067 1068 1069
    return NULL;
  }

D
dapan1121 已提交
1070 1071
  bool hasNext = false;
  code = tsdbNextDataBlock(pReader, &hasNext);
1072 1073
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
dengyihao's avatar
dengyihao 已提交
1074
    T_LONG_JMP(pTaskInfo->env, code);
1075 1076 1077
    return NULL;
  }

D
dapan1121 已提交
1078
  if (hasNext) {
L
Liu Jicong 已提交
1079
    /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
H
Haojun Liao 已提交
1080
    doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
1081
    pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
1082 1083 1084
  }

  tsdbReaderClose(pReader);
D
dapan1121 已提交
1085
  qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
5
54liuyao 已提交
1086 1087
         ", suid:%" PRIu64,
         pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
1088 1089

  return pBlock->info.rows > 0 ? pBlock : NULL;
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
}

static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
  if (!pPreRes || pPreRes->info.rows == 0) {
    return 0;
  }
  ASSERT(pPreRes->info.rows == 1);
  return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
}

5
54liuyao 已提交
1101
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
1102
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1103
  return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid);
1104 1105
}

5
54liuyao 已提交
1106 1107 1108 1109 1110 1111 1112 1113
static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) {
  if (pInfo->partitionSup.needCalc) {
    return getGroupIdByCol(pInfo, uid, ts, maxVersion);
  }

  return getGroupIdByUid(pInfo, uid);
}

L
Liu Jicong 已提交
1114
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
5
54liuyao 已提交
1115 1116 1117
  if (pBlock->info.rows == 0) {
    return false;
  }
L
Liu Jicong 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
  if ((*pRowIndex) == pBlock->info.rows) {
    return false;
  }

  ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
  STimeWindow      win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
1128 1129 1130
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpData = (uint64_t*)pGpCol->pData;
  uint64_t         groupId = gpData[*pRowIndex];
1131 1132 1133 1134 1135 1136

  SColumnInfoData* pCalStartTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  TSKEY*           calStartData = (TSKEY*)pCalStartTsCol->pData;
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
  TSKEY*           calEndData = (TSKEY*)pCalEndTsCol->pData;

L
Liu Jicong 已提交
1137
  setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
1138 1139 1140 1141
  if (isSlidingWindow(pInfo)) {
    pInfo->updateWin.skey = calStartData[*pRowIndex];
    pInfo->updateWin.ekey = calEndData[*pRowIndex];
  }
L
Liu Jicong 已提交
1142 1143 1144
  (*pRowIndex)++;

  for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
1145
    if (win.skey == startData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1146 1147 1148
      win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
      continue;
    }
1149
    if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) {
L
Liu Jicong 已提交
1150 1151 1152
      win.skey = TMIN(win.skey, startData[*pRowIndex]);
      continue;
    }
1153 1154
    ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
           !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
L
Liu Jicong 已提交
1155 1156 1157
    break;
  }

L
fix bug  
liuyao 已提交
1158
  resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
1159
  pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1160 1161 1162
  return true;
}

5
54liuyao 已提交
1163
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
1164
                                    SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
H
Haojun Liao 已提交
1165
  SResultRowInfo dumyInfo = {0};
5
54liuyao 已提交
1166
  dumyInfo.cur.pageId = -1;
1167
  STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
5
54liuyao 已提交
1168 1169
  STimeWindow endWin = win;
  STimeWindow preWin = win;
5
54liuyao 已提交
1170
  uint64_t    groupId = gpIdCol[*pRowIndex];
H
Haojun Liao 已提交
1171

5
54liuyao 已提交
1172
  while (1) {
1173 1174 1175
    if (hasGroup) {
      (*pRowIndex) += 1;
    } else {
5
54liuyao 已提交
1176
      while ((groupId == gpIdCol[(*pRowIndex)] && startTsCol[*pRowIndex] <= endWin.ekey)) {
5
54liuyao 已提交
1177 1178 1179 1180 1181
        (*pRowIndex) += 1;
        if ((*pRowIndex) == pDataBlockInfo->rows) {
          break;
        }
      }
1182
    }
5
54liuyao 已提交
1183

5
54liuyao 已提交
1184 1185 1186
    do {
      preWin = endWin;
      getNextTimeWindow(pInterval, &endWin, TSDB_ORDER_ASC);
1187
    } while (endTsCol[(*pRowIndex) - 1] >= endWin.skey);
5
54liuyao 已提交
1188
    endWin = preWin;
5
54liuyao 已提交
1189
    if (win.ekey == endWin.ekey || (*pRowIndex) == pDataBlockInfo->rows || groupId != gpIdCol[*pRowIndex]) {
5
54liuyao 已提交
1190 1191 1192 1193 1194 1195
      win.ekey = endWin.ekey;
      return win;
    }
    win.ekey = endWin.ekey;
  }
}
5
54liuyao 已提交
1196

L
Liu Jicong 已提交
1197
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
L
liuyao 已提交
1198
  qInfo("do stream range scan. windows index:%d", *pRowIndex);
L
liuyao 已提交
1199
  bool prepareRes = true;
L
Liu Jicong 已提交
1200 1201 1202
  while (1) {
    SSDataBlock* pResult = NULL;
    pResult = doTableScan(pInfo->pTableScanOp);
L
liuyao 已提交
1203 1204
    if (!pResult) {
      prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
L
Liu Jicong 已提交
1205 1206 1207 1208
      // scan next window data
      pResult = doTableScan(pInfo->pTableScanOp);
    }
    if (!pResult) {
L
liuyao 已提交
1209 1210 1211
      if (prepareRes) {
        continue;
      }
L
Liu Jicong 已提交
1212 1213
      blockDataCleanup(pSDB);
      *pRowIndex = 0;
5
54liuyao 已提交
1214
      pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
H
Hongze Cheng 已提交
1215
      STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1216
      tsdbReaderClose(pTableScanInfo->base.dataReader);
D
dapan1121 已提交
1217
      qDebug("2");
H
Haojun Liao 已提交
1218
      pTableScanInfo->base.dataReader = NULL;
1219 1220
      return NULL;
    }
L
Liu Jicong 已提交
1221

H
Haojun Liao 已提交
1222
    doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
1223 1224 1225 1226
    if (pResult->info.rows == 0) {
      continue;
    }

1227 1228 1229 1230 1231 1232 1233 1234
    if (pInfo->partitionSup.needCalc) {
      SSDataBlock* tmpBlock = createOneDataBlock(pResult, true);
      blockDataCleanup(pResult);
      for (int32_t i = 0; i < tmpBlock->info.rows; i++) {
        if (calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, tmpBlock, i) == pInfo->groupId) {
          for (int32_t j = 0; j < pInfo->pTableScanOp->exprSupp.numOfExprs; j++) {
            SColumnInfoData* pSrcCol = taosArrayGet(tmpBlock->pDataBlock, j);
            SColumnInfoData* pDestCol = taosArrayGet(pResult->pDataBlock, j);
L
Liu Jicong 已提交
1235 1236
            bool             isNull = colDataIsNull(pSrcCol, tmpBlock->info.rows, i, NULL);
            char*            pSrcData = colDataGetData(pSrcCol, i);
1237
            colDataSetVal(pDestCol, pResult->info.rows, pSrcData, isNull);
1238 1239 1240 1241
          }
          pResult->info.rows++;
        }
      }
H
Haojun Liao 已提交
1242 1243 1244

      blockDataDestroy(tmpBlock);

1245 1246 1247 1248
      if (pResult->info.rows > 0) {
        pResult->info.calWin = pInfo->updateWin;
        return pResult;
      }
H
Haojun Liao 已提交
1249
    } else if (pResult->info.id.groupId == pInfo->groupId) {
5
54liuyao 已提交
1250
      pResult->info.calWin = pInfo->updateWin;
1251
      return pResult;
5
54liuyao 已提交
1252 1253
    }
  }
1254
}
1255

1256
static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
X
Xiaoyu Wang 已提交
1257
                                   SSessionKey* pKey) {
1258 1259 1260
  pKey->win.skey = startTs;
  pKey->win.ekey = endTs;
  pKey->groupId = groupId;
X
Xiaoyu Wang 已提交
1261

1262 1263 1264 1265 1266
  SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, pKey);
  int32_t          code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
  if (code != TSDB_CODE_SUCCESS) {
    SET_SESSION_WIN_KEY_INVALID(pKey);
  }
D
dapan1121 已提交
1267 1268

  taosMemoryFree(pCur);
1269 1270 1271
  return code;
}

1272
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1273
  blockDataCleanup(pDestBlock);
1274 1275
  if (pSrcBlock->info.rows == 0) {
    return TSDB_CODE_SUCCESS;
1276
  }
1277
  int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
1278
  if (code != TSDB_CODE_SUCCESS) {
1279
    return code;
L
Liu Jicong 已提交
1280
  }
1281 1282
  ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
  SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1283
  TSKEY*           startData = (TSKEY*)pStartTsCol->pData;
1284
  SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1285
  TSKEY*           endData = (TSKEY*)pEndTsCol->pData;
1286 1287
  SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
L
Liu Jicong 已提交
1288

1289 1290
  SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
5
54liuyao 已提交
1291
  SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1292
  SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1293 1294
  SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
L
Liu Jicong 已提交
1295
  int64_t          version = pSrcBlock->info.version - 1;
1296
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
1297
    uint64_t groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], version);
L
Liu Jicong 已提交
1298
    // gap must be 0.
5
54liuyao 已提交
1299
    SSessionKey startWin = {0};
1300
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], startData[i], groupId, &startWin);
5
54liuyao 已提交
1301
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
L
Liu Jicong 已提交
1302 1303 1304
      // window has been closed.
      continue;
    }
5
54liuyao 已提交
1305 1306
    SSessionKey endWin = {0};
    getCurSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
X
Xiaoyu Wang 已提交
1307
    if (IS_INVALID_SESSION_WIN_KEY(endWin)) {
1308 1309 1310 1311
      getPreSessionWindow(pInfo->windowSup.pStreamAggSup, endData[i], endData[i], groupId, &endWin);
    }
    if (IS_INVALID_SESSION_WIN_KEY(startWin)) {
      // window has been closed.
X
Xiaoyu Wang 已提交
1312
      qError("generate session scan range failed. rang start:%" PRIx64 ", end:%" PRIx64, startData[i], endData[i]);
1313 1314
      continue;
    }
1315 1316
    colDataSetVal(pDestStartCol, i, (const char*)&startWin.win.skey, false);
    colDataSetVal(pDestEndCol, i, (const char*)&endWin.win.ekey, false);
5
54liuyao 已提交
1317

1318
    colDataSetNULL(pDestUidCol, i);
1319
    colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
1320 1321
    colDataSetNULL(pDestCalStartTsCol, i);
    colDataSetNULL(pDestCalEndTsCol, i);
1322
    pDestBlock->info.rows++;
L
Liu Jicong 已提交
1323
  }
1324
  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
1325
}
1326 1327 1328 1329 1330 1331

static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
    return TSDB_CODE_SUCCESS;
1332
  }
1333

1334 1335
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
1336 1337
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
5
54liuyao 已提交
1338

L
Liu Jicong 已提交
1339
  uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
1340
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;

  if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
    uint64_t     srcUid = srcUidData[0];
    TSKEY        startTs = srcStartTsCol[0];
    TSKEY        endTs = srcEndTsCol[0];
    SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
    printDataBlock(pPreRes, "pre res");
    blockDataCleanup(pSrcBlock);
    int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }

    SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
    rows = pPreRes->info.rows;

    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
      appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
                                       &groupId, NULL);
    }
    printDataBlock(pSrcBlock, "new delete");
  }
  uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  srcUidData = (uint64_t*)pSrcUidCol->pData;

  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

1377 1378
  SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
1379
  SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
1380 1381 1382
  SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1383
  for (int32_t i = 0; i < rows;) {
1384
    uint64_t srcUid = srcUidData[i];
5
54liuyao 已提交
1385 1386 1387 1388 1389
    uint64_t groupId = srcGp[i];
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
    TSKEY calStartTs = srcStartTsCol[i];
1390
    colDataSetVal(pCalStartTsCol, pDestBlock->info.rows, (const char*)(&calStartTs), false);
5
54liuyao 已提交
1391
    STimeWindow win = getSlidingWindow(srcStartTsCol, srcEndTsCol, srcGp, &pInfo->interval, &pSrcBlock->info, &i,
1392 1393
                                       pInfo->partitionSup.needCalc);
    TSKEY       calEndTs = srcStartTsCol[i - 1];
1394 1395 1396 1397 1398
    colDataSetVal(pCalEndTsCol, pDestBlock->info.rows, (const char*)(&calEndTs), false);
    colDataSetVal(pDeUidCol, pDestBlock->info.rows, (const char*)(&srcUid), false);
    colDataSetVal(pStartTsCol, pDestBlock->info.rows, (const char*)(&win.skey), false);
    colDataSetVal(pEndTsCol, pDestBlock->info.rows, (const char*)(&win.ekey), false);
    colDataSetVal(pGpCol, pDestBlock->info.rows, (const char*)(&groupId), false);
1399
    pDestBlock->info.rows++;
5
54liuyao 已提交
1400
  }
1401 1402
  return TSDB_CODE_SUCCESS;
}
1403

1404
static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
5
54liuyao 已提交
1405 1406 1407
  blockDataCleanup(pDestBlock);
  int32_t rows = pSrcBlock->info.rows;
  if (rows == 0) {
1408 1409
    return TSDB_CODE_SUCCESS;
  }
5
54liuyao 已提交
1410
  int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
1411 1412 1413 1414
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

5
54liuyao 已提交
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424
  SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        srcUidData = (uint64_t*)pSrcUidCol->pData;
  SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        srcGp = (uint64_t*)pSrcGpCol->pData;
  ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
  TSKEY*  srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
  TSKEY*  srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
  int64_t version = pSrcBlock->info.version - 1;
1425
  for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
5
54liuyao 已提交
1426 1427
    uint64_t srcUid = srcUidData[i];
    uint64_t groupId = srcGp[i];
L
Liu Jicong 已提交
1428
    char*    tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
5
54liuyao 已提交
1429 1430 1431
    if (groupId == 0) {
      groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
    }
L
Liu Jicong 已提交
1432
    if (pInfo->tbnameCalSup.pExprInfo) {
1433 1434 1435
      void* parTbname = NULL;
      streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);

L
Liu Jicong 已提交
1436 1437
      memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
      varDataSetLen(tbname, strlen(varDataVal(tbname)));
dengyihao's avatar
dengyihao 已提交
1438
      streamFreeVal(parTbname);
L
Liu Jicong 已提交
1439 1440 1441
    }
    appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
                                     tbname[0] == 0 ? NULL : tbname);
1442 1443 1444 1445
  }
  return TSDB_CODE_SUCCESS;
}

1446 1447 1448 1449
static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
  int32_t code = TSDB_CODE_SUCCESS;
  if (isIntervalWindow(pInfo)) {
    code = generateIntervalScanRange(pInfo, pSrcBlock, pDestBlock);
1450
  } else if (isSessionWindow(pInfo) || isStateWindow(pInfo)) {
1451
    code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock);
5
54liuyao 已提交
1452 1453
  } else {
    code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
1454
  }
1455
  pDestBlock->info.type = STREAM_CLEAR;
1456
  pDestBlock->info.version = pSrcBlock->info.version;
1457
  pDestBlock->info.dataLoad = 1;
1458 1459 1460 1461
  blockDataUpdateTsWindow(pDestBlock, 0);
  return code;
}

5
54liuyao 已提交
1462
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
1463
  SExprSupp*    pTbNameCalSup = &pInfo->tbnameCalSup;
5
54liuyao 已提交
1464 1465
  blockDataCleanup(pInfo->pCreateTbRes);
  if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
L
Liu Jicong 已提交
1466
    pBlock->info.parTbName[0] = 0;
L
Liu Jicong 已提交
1467
  } else {
5
54liuyao 已提交
1468 1469
    appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup,
                         pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes);
L
Liu Jicong 已提交
1470
  }
L
Liu Jicong 已提交
1471 1472
}

1473 1474
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
                                      uint64_t* pGp, void* pTbName) {
1475 1476
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
1477 1478
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
1479 1480
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
1481
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
1482 1483 1484 1485 1486 1487 1488
  colDataSetVal(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pUidCol, pBlock->info.rows, (const char*)pUid, false);
  colDataSetVal(pGpCol, pBlock->info.rows, (const char*)pGp, false);
  colDataSetVal(pCalStartCol, pBlock->info.rows, (const char*)pStartTs, false);
  colDataSetVal(pCalEndCol, pBlock->info.rows, (const char*)pEndTs, false);
  colDataSetVal(pTableCol, pBlock->info.rows, (const char*)pTbName, pTbName == NULL);
1489
  pBlock->info.rows++;
5
54liuyao 已提交
1490 1491
}

1492
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
1493 1494
  if (out) {
    blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
1495
    blockDataEnsureCapacity(pInfo->pUpdateDataRes, pBlock->info.rows * 2);
1496
  }
1497 1498
  SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
  ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
5
54liuyao 已提交
1499
  TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
H
Haojun Liao 已提交
1500
  bool   tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.id.uid);
1501
  for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) {
5
54liuyao 已提交
1502 1503
    SResultRowInfo dumyInfo;
    dumyInfo.cur.pageId = -1;
L
Liu Jicong 已提交
1504
    bool        isClosed = false;
5
54liuyao 已提交
1505
    STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
X
Xiaoyu Wang 已提交
1506
    bool        overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
1507 1508 1509 1510 1511
    if (pInfo->igExpired && overDue) {
      continue;
    }

    if (tableInserted && overDue) {
5
54liuyao 已提交
1512 1513 1514
      win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
      isClosed = isCloseWindow(&win, &pInfo->twAggSup);
    }
5
54liuyao 已提交
1515
    // must check update info first.
H
Haojun Liao 已提交
1516
    bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
L
Liu Jicong 已提交
1517
    bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
H
Haojun Liao 已提交
1518
                     isDeletedStreamWindow(&win, pBlock->info.id.groupId,
L
liuyao 已提交
1519
                                           pInfo->pState, &pInfo->twAggSup);
L
Liu Jicong 已提交
1520
    if ((update || closedWin) && out) {
L
Liu Jicong 已提交
1521
      qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
5
54liuyao 已提交
1522
      uint64_t gpId = 0;
H
Haojun Liao 已提交
1523
      appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid, &gpId,
1524
                                       NULL);
5
54liuyao 已提交
1525 1526
      if (closedWin && pInfo->partitionSup.needCalc) {
        gpId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pBlock, rowId);
S
slzhou 已提交
1527 1528
        appendOneRowToStreamSpecialBlock(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.id.uid,
                                         &gpId, NULL);
5
54liuyao 已提交
1529
      }
1530 1531
    }
  }
1532 1533
  if (out && pInfo->pUpdateDataRes->info.rows > 0) {
    pInfo->pUpdateDataRes->info.version = pBlock->info.version;
1534
    pInfo->pUpdateDataRes->info.dataLoad = 1;
1535
    blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
1536
    pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
5
54liuyao 已提交
1537 1538
  }
}
L
Liu Jicong 已提交
1539

1540
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
L
Liu Jicong 已提交
1541 1542
  SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
  SOperatorInfo*  pOperator = pInfo->pStreamScanOp;
L
Liu Jicong 已提交
1543
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
L
Liu Jicong 已提交
1544

1545 1546
  blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);

L
Liu Jicong 已提交
1547
  pInfo->pRes->info.rows = pBlock->info.rows;
H
Haojun Liao 已提交
1548
  pInfo->pRes->info.id.uid = pBlock->info.id.uid;
L
Liu Jicong 已提交
1549
  pInfo->pRes->info.type = STREAM_NORMAL;
1550
  pInfo->pRes->info.version = pBlock->info.version;
L
Liu Jicong 已提交
1551

1552
  STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
1553
  pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid);
L
Liu Jicong 已提交
1554 1555

  // todo extract method
H
Haojun Liao 已提交
1556 1557 1558
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
    SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i);
    if (!pColMatchInfo->needOutput) {
L
Liu Jicong 已提交
1559 1560 1561 1562 1563 1564 1565
      continue;
    }

    bool colExists = false;
    for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) {
      SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j);
      if (pResCol->info.colId == pColMatchInfo->colId) {
H
Haojun Liao 已提交
1566
        SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1567
        colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info);
L
Liu Jicong 已提交
1568 1569 1570 1571 1572 1573 1574
        colExists = true;
        break;
      }
    }

    // the required column does not exists in submit block, let's set it to be all null value
    if (!colExists) {
H
Haojun Liao 已提交
1575
      SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId);
1576
      colDataSetNNULL(pDst, 0, pBlockInfo->rows);
L
Liu Jicong 已提交
1577 1578 1579 1580 1581
    }
  }

  // currently only the tbname pseudo column
  if (pInfo->numOfPseudoExpr > 0) {
L
Liu Jicong 已提交
1582
    int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
1583
                                          pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
K
kailixu 已提交
1584 1585
    // ignore the table not exists error, since this table may have been dropped during the scan procedure.
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
L
Liu Jicong 已提交
1586
      blockDataFreeRes((SSDataBlock*)pBlock);
1587
      T_LONG_JMP(pTaskInfo->env, code);
H
Haojun Liao 已提交
1588
    }
K
kailixu 已提交
1589 1590 1591

    // reset the error code.
    terrno = 0;
L
Liu Jicong 已提交
1592 1593
  }

1594
  if (filter) {
H
Haojun Liao 已提交
1595
    doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
1596
  }
1597

1598
  pInfo->pRes->info.dataLoad = 1;
L
Liu Jicong 已提交
1599
  blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
L
Liu Jicong 已提交
1600
  blockDataFreeRes((SSDataBlock*)pBlock);
L
Liu Jicong 已提交
1601

L
Liu Jicong 已提交
1602
  calBlockTbName(pInfo, pInfo->pRes);
L
Liu Jicong 已提交
1603 1604
  return 0;
}
5
54liuyao 已提交
1605

L
Liu Jicong 已提交
1606
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
1607 1608
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;
X
Xiaoyu Wang 已提交
1609
  const char*      id = GET_TASKID(pTaskInfo);
H
Haojun Liao 已提交
1610

1611
  qDebug("start to exec queue scan, %s", id);
L
Liu Jicong 已提交
1612

L
Liu Jicong 已提交
1613
  if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
L
Liu Jicong 已提交
1614
    if (pInfo->tqReader->msg2.msgStr == NULL) {
L
Liu Jicong 已提交
1615
      SPackedData submit = pTaskInfo->streamInfo.submit;
1616
      if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) {
L
Liu Jicong 已提交
1617
        qError("submit msg messed up when initing stream submit block %p", submit.msgStr);
1618
        return NULL;
L
Liu Jicong 已提交
1619 1620 1621 1622 1623 1624
      }
    }

    blockDataCleanup(pInfo->pRes);
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

1625
    while (tqNextDataBlock(pInfo->tqReader)) {
L
Liu Jicong 已提交
1626 1627
      SSDataBlock block = {0};

1628
      int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
L
Liu Jicong 已提交
1629 1630 1631 1632
      if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
        continue;
      }

1633
      setBlockIntoRes(pInfo, &block, true);
L
Liu Jicong 已提交
1634 1635 1636 1637 1638 1639

      if (pBlockInfo->rows > 0) {
        return pInfo->pRes;
      }
    }

L
Liu Jicong 已提交
1640 1641
    pInfo->tqReader->msg2 = (SPackedData){0};
    pTaskInfo->streamInfo.submit = (SPackedData){0};
L
Liu Jicong 已提交
1642
    return NULL;
L
Liu Jicong 已提交
1643 1644
  }

1645
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
L
Liu Jicong 已提交
1646 1647
    SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
    if (pResult && pResult->info.rows > 0) {
X
Xiaoyu Wang 已提交
1648
      qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64,
dengyihao's avatar
dengyihao 已提交
1649
             pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey,
X
Xiaoyu Wang 已提交
1650
             pInfo->tqReader->pWalReader->curVersion);
1651
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
L
Liu Jicong 已提交
1652
      return pResult;
1653
    }
1654 1655 1656 1657 1658 1659
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
    tsdbReaderClose(pTSInfo->base.dataReader);
    pTSInfo->base.dataReader = NULL;
    qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
    if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
      return NULL;
1660
    }
wmmhello's avatar
wmmhello 已提交
1661
    tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
1662 1663
  }

1664
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
L
Liu Jicong 已提交
1665 1666
    while (1) {
      SFetchRet ret = {0};
1667
      tqNextBlock(pInfo->tqReader, &ret);
X
Xiaoyu Wang 已提交
1668 1669 1670
      tqOffsetResetToLog(
          &pTaskInfo->streamInfo.currentOffset,
          pInfo->tqReader->pWalReader->curVersion - 1);  // curVersion move to next, so currentOffset = curVersion - 1
1671

L
Liu Jicong 已提交
1672
      if (ret.fetchType == FETCH_TYPE__DATA) {
X
Xiaoyu Wang 已提交
1673 1674
        qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, ret.data.info.rows,
               pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1675
        blockDataCleanup(pInfo->pRes);
1676
        setBlockIntoRes(pInfo, &ret.data, true);
L
Liu Jicong 已提交
1677
        if (pInfo->pRes->info.rows > 0) {
X
Xiaoyu Wang 已提交
1678 1679
          qDebug("doQueueScan get data from log %" PRId64 " rows, return, version:%" PRId64, pInfo->pRes->info.rows,
                 pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1680 1681
          return pInfo->pRes;
        }
X
Xiaoyu Wang 已提交
1682
      } else if (ret.fetchType == FETCH_TYPE__NONE) {
wmmhello's avatar
wmmhello 已提交
1683
        qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
L
Liu Jicong 已提交
1684 1685 1686
        return NULL;
      }
    }
L
Liu Jicong 已提交
1687
  } else {
1688
    qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
L
Liu Jicong 已提交
1689
    return NULL;
H
Haojun Liao 已提交
1690
  }
L
Liu Jicong 已提交
1691 1692
}

L
Liu Jicong 已提交
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) {
  STqReader* pReader = pInfo->tqReader;
  int32_t    rows = pSrc->info.rows;
  blockDataEnsureCapacity(pDst, rows);

  SColumnInfoData* pSrcStartCol = taosArrayGet(pSrc->pDataBlock, START_TS_COLUMN_INDEX);
  uint64_t*        startCol = (uint64_t*)pSrcStartCol->pData;
  SColumnInfoData* pSrcEndCol = taosArrayGet(pSrc->pDataBlock, END_TS_COLUMN_INDEX);
  uint64_t*        endCol = (uint64_t*)pSrcEndCol->pData;
  SColumnInfoData* pSrcUidCol = taosArrayGet(pSrc->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pSrcUidCol->pData;

  SColumnInfoData* pDstStartCol = taosArrayGet(pDst->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pDstEndCol = taosArrayGet(pDst->pDataBlock, END_TS_COLUMN_INDEX);
  SColumnInfoData* pDstUidCol = taosArrayGet(pDst->pDataBlock, UID_COLUMN_INDEX);
  int32_t          j = 0;
  for (int32_t i = 0; i < rows; i++) {
    if (taosHashGet(pReader->tbIdHash, &uidCol[i], sizeof(uint64_t))) {
1711 1712 1713
      colDataSetVal(pDstStartCol, j, (const char*)&startCol[i], false);
      colDataSetVal(pDstEndCol, j, (const char*)&endCol[i], false);
      colDataSetVal(pDstUidCol, j, (const char*)&uidCol[i], false);
L
Liu Jicong 已提交
1714

1715 1716 1717
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, GROUPID_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX), j);
      colDataSetNULL(taosArrayGet(pDst->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX), j);
L
Liu Jicong 已提交
1718 1719 1720
      j++;
    }
  }
L
Liu Jicong 已提交
1721
  uint32_t cap = pDst->info.capacity;
L
Liu Jicong 已提交
1722 1723
  pDst->info = pSrc->info;
  pDst->info.rows = j;
L
Liu Jicong 已提交
1724
  pDst->info.capacity = cap;
L
Liu Jicong 已提交
1725 1726 1727 1728

  return 0;
}

5
54liuyao 已提交
1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740
// for partition by tag
static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
  SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
  TSKEY*           startTsCol = (TSKEY*)pStartTsCol->pData;
  SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  uint64_t*        gpCol = (uint64_t*)pGpCol->pData;
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
  uint64_t*        uidCol = (uint64_t*)pUidCol->pData;
  int32_t          rows = pBlock->info.rows;
  if (!pInfo->partitionSup.needCalc) {
    for (int32_t i = 0; i < rows; i++) {
      uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
1741
      colDataSetVal(pGpCol, i, (const char*)&groupId, false);
5
54liuyao 已提交
1742 1743 1744 1745
    }
  }
}

5
54liuyao 已提交
1746
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
5
54liuyao 已提交
1747
  if (pInfo->pUpdateInfo) {
L
fix bug  
liuyao 已提交
1748
    pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
5
54liuyao 已提交
1749
    checkUpdateData(pInfo, true, pBlock, true);
5
54liuyao 已提交
1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760
    pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
    if (pInfo->pUpdateDataRes->info.rows > 0) {
      pInfo->updateResIndex = 0;
      if (pInfo->pUpdateDataRes->info.type == STREAM_CLEAR) {
        pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_INVERT) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
        // return pInfo->pUpdateDataRes;
      } else if (pInfo->pUpdateDataRes->info.type == STREAM_DELETE_DATA) {
        pInfo->scanMode = STREAM_SCAN_FROM_DELETE_DATA;
      }
5
54liuyao 已提交
1761 1762 1763 1764
    }
  }
}

L
liuyao 已提交
1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
  int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
  *pBuff = taosMemoryCalloc(1, len);
  updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo);
  return len;
}

// other properties are recovered from the execution plan
void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
  if (!pBuff) {
    return;
  }

1778 1779
  SUpdateInfo* pUpInfo = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
  int32_t      code = updateInfoDeserialize(pBuff, len, pUpInfo);
L
liuyao 已提交
1780 1781 1782 1783 1784
  if (code == TSDB_CODE_SUCCESS) {
    pInfo->pUpdateInfo = pUpInfo;
  }
}

L
Liu Jicong 已提交
1785 1786 1787 1788 1789
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*   pTaskInfo = pOperator->pTaskInfo;
  SStreamScanInfo* pInfo = pOperator->info;

L
Liu Jicong 已提交
1790
  qDebug("stream scan called");
H
Haojun Liao 已提交
1791

1792 1793
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
L
Liu Jicong 已提交
1794
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1795
    memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
1796
    if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) {
H
Haojun Liao 已提交
1797 1798 1799 1800
      pTSInfo->base.cond.startVersion = 0;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
      qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1801
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
1802
    } else {
H
Haojun Liao 已提交
1803 1804 1805 1806
      pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
      pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
      qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
             pTSInfo->base.cond.endVersion);
5
54liuyao 已提交
1807
      pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
1808
    }
L
Liu Jicong 已提交
1809

H
Haojun Liao 已提交
1810
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1811

H
Haojun Liao 已提交
1812
    pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
1813
    pInfo->pTableScanOp->status = OP_OPENED;
L
Liu Jicong 已提交
1814

L
Liu Jicong 已提交
1815 1816
    pTSInfo->scanTimes = 0;
    pTSInfo->currentGroupId = -1;
L
Liu Jicong 已提交
1817
    pTaskInfo->streamInfo.recoverScanFinished = false;
L
Liu Jicong 已提交
1818 1819
  }

5
54liuyao 已提交
1820 1821
  if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
      pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
L
Liu Jicong 已提交
1822 1823 1824 1825 1826
    if (pInfo->blockRecoverContiCnt > 100) {
      pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
      pInfo->blockRecoverContiCnt = 0;
      return NULL;
    }
5
54liuyao 已提交
1827 1828 1829 1830 1831 1832 1833

    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
        printDataBlock(pInfo->pRecoverRes, "scan recover");
        return pInfo->pRecoverRes;
      } break;
5
54liuyao 已提交
1834 1835 1836 1837
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
1838
        printDataBlock(pInfo->pUpdateRes, "recover update");
5
54liuyao 已提交
1839 1840
        return pInfo->pUpdateRes;
      } break;
1841 1842 1843 1844 1845 1846 1847 1848 1849
      case STREAM_SCAN_FROM_DELETE_DATA: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        printDataBlock(pInfo->pDeleteDataRes, "recover delete");
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
1850 1851 1852 1853 1854 1855
      case STREAM_SCAN_FROM_DATAREADER_RANGE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
1856
          printDataBlock(pSDB, "scan recover update");
5
54liuyao 已提交
1857 1858 1859 1860 1861 1862
          calBlockTbName(pInfo, pSDB);
          return pSDB;
        }
        blockDataCleanup(pInfo->pUpdateDataRes);
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
5
54liuyao 已提交
1863 1864 1865 1866 1867 1868
      default:
        break;
    }

    pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
    if (pInfo->pRecoverRes != NULL) {
L
Liu Jicong 已提交
1869
      pInfo->blockRecoverContiCnt++;
5
54liuyao 已提交
1870
      calBlockTbName(pInfo, pInfo->pRecoverRes);
1871
      if (pInfo->pUpdateInfo) {
5
54liuyao 已提交
1872 1873 1874 1875 1876 1877
        if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
          TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
          pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
        } else {
          doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
        }
1878
      }
5
54liuyao 已提交
1879 1880
      if (pInfo->pCreateTbRes->info.rows > 0) {
        pInfo->scanMode = STREAM_SCAN_FROM_RES;
1881
        printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
5
54liuyao 已提交
1882 1883
        return pInfo->pCreateTbRes;
      }
X
Xiaoyu Wang 已提交
1884
      qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
5
54liuyao 已提交
1885 1886
      printDataBlock(pInfo->pRecoverRes, "scan recover");
      return pInfo->pRecoverRes;
L
Liu Jicong 已提交
1887 1888
    }
    pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
L
Liu Jicong 已提交
1889
    STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
H
Haojun Liao 已提交
1890
    tsdbReaderClose(pTSInfo->base.dataReader);
D
dapan1121 已提交
1891

H
Haojun Liao 已提交
1892
    pTSInfo->base.dataReader = NULL;
1893

H
Haojun Liao 已提交
1894 1895
    pTSInfo->base.cond.startVersion = -1;
    pTSInfo->base.cond.endVersion = -1;
L
Liu Jicong 已提交
1896

L
Liu Jicong 已提交
1897
    pTaskInfo->streamInfo.recoverScanFinished = true;
L
Liu Jicong 已提交
1898 1899 1900
    return NULL;
  }

5
54liuyao 已提交
1901
  size_t total = taosArrayGetSize(pInfo->pBlockLists);
5
54liuyao 已提交
1902
// TODO: refactor
L
Liu Jicong 已提交
1903
FETCH_NEXT_BLOCK:
L
Liu Jicong 已提交
1904
  if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {
H
Haojun Liao 已提交
1905
    if (pInfo->validBlockIndex >= total) {
L
Liu Jicong 已提交
1906
      doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
1907
      /*pOperator->status = OP_EXEC_DONE;*/
H
Haojun Liao 已提交
1908 1909 1910
      return NULL;
    }

1911
    int32_t      current = pInfo->validBlockIndex++;
L
Liu Jicong 已提交
1912 1913
    SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current);
    SSDataBlock* pBlock = pPacked->pDataBlock;
5
54liuyao 已提交
1914
    if (pBlock->info.parTbName[0]) {
H
Haojun Liao 已提交
1915
      streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName);
1916
    }
1917

1918
    // TODO move into scan
5
54liuyao 已提交
1919 1920
    pBlock->info.calWin.skey = INT64_MIN;
    pBlock->info.calWin.ekey = INT64_MAX;
1921
    pBlock->info.dataLoad = 1;
L
fix bug  
liuyao 已提交
1922 1923 1924
    if (pInfo->pUpdateInfo) {
      pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
    }
1925
    blockDataUpdateTsWindow(pBlock, 0);
1926
    switch (pBlock->info.type) {
L
Liu Jicong 已提交
1927 1928 1929
      case STREAM_NORMAL:
      case STREAM_GET_ALL:
        return pBlock;
1930 1931 1932
      case STREAM_RETRIEVE: {
        pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
1933
        copyDataBlock(pInfo->pUpdateRes, pBlock);
L
liuyao 已提交
1934
        pInfo->updateResIndex = 0;
1935
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
1936 1937 1938
        updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
      } break;
      case STREAM_DELETE_DATA: {
1939
        printDataBlock(pBlock, "stream scan delete recv");
L
Liu Jicong 已提交
1940
        SSDataBlock* pDelBlock = NULL;
L
Liu Jicong 已提交
1941
        if (pInfo->tqReader) {
L
Liu Jicong 已提交
1942
          pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
L
Liu Jicong 已提交
1943
          filterDelBlockByUid(pDelBlock, pBlock, pInfo);
L
Liu Jicong 已提交
1944 1945
        } else {
          pDelBlock = pBlock;
L
Liu Jicong 已提交
1946
        }
5
54liuyao 已提交
1947 1948
        setBlockGroupIdByUid(pInfo, pDelBlock);
        printDataBlock(pDelBlock, "stream scan delete recv filtered");
5
54liuyao 已提交
1949 1950 1951 1952 1953 1954
        if (pDelBlock->info.rows == 0) {
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
          goto FETCH_NEXT_BLOCK;
        }
1955
        if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
L
Liu Jicong 已提交
1956
          generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
1957
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
L
Liu Jicong 已提交
1958
          printDataBlock(pDelBlock, "stream scan delete result");
H
Haojun Liao 已提交
1959 1960
          blockDataDestroy(pDelBlock);

L
Liu Jicong 已提交
1961 1962 1963 1964 1965
          if (pInfo->pDeleteDataRes->info.rows > 0) {
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1966 1967 1968
        } else {
          pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
          pInfo->updateResIndex = 0;
L
Liu Jicong 已提交
1969
          generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
1970 1971 1972
          prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
          copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
          pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
L
Liu Jicong 已提交
1973 1974 1975 1976
          printDataBlock(pDelBlock, "stream scan delete data");
          if (pInfo->tqReader) {
            blockDataDestroy(pDelBlock);
          }
L
Liu Jicong 已提交
1977
          if (pInfo->pDeleteDataRes->info.rows > 0) {
5
54liuyao 已提交
1978
            pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
L
Liu Jicong 已提交
1979 1980 1981 1982
            return pInfo->pDeleteDataRes;
          } else {
            goto FETCH_NEXT_BLOCK;
          }
1983
        }
1984 1985 1986
      } break;
      default:
        break;
5
54liuyao 已提交
1987
    }
1988
    // printDataBlock(pBlock, "stream scan recv");
1989
    return pBlock;
L
Liu Jicong 已提交
1990
  } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
L
Liu Jicong 已提交
1991
    qDebug("scan mode %d", pInfo->scanMode);
5
54liuyao 已提交
1992 1993 1994
    switch (pInfo->scanMode) {
      case STREAM_SCAN_FROM_RES: {
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
5
54liuyao 已提交
1995
        doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
5
54liuyao 已提交
1996 1997 1998
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
        pInfo->pRes->info.dataLoad = 1;
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
5
54liuyao 已提交
1999 2000 2001
        if (pInfo->pRes->info.rows > 0) {
          return pInfo->pRes;
        }
5
54liuyao 已提交
2002
      } break;
2003
      case STREAM_SCAN_FROM_DELETE_DATA: {
2004 2005 2006 2007 2008 2009 2010
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
        pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
        return pInfo->pDeleteDataRes;
      } break;
5
54liuyao 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019 2020
      case STREAM_SCAN_FROM_UPDATERES: {
        generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
        prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
        pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
        return pInfo->pUpdateRes;
      } break;
      case STREAM_SCAN_FROM_DATAREADER_RANGE:
      case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
        SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
        if (pSDB) {
2021
          STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
5
54liuyao 已提交
2022 2023
          pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
          checkUpdateData(pInfo, true, pSDB, false);
L
liuyao 已提交
2024
          printDataBlock(pSDB, "stream scan update");
L
Liu Jicong 已提交
2025
          calBlockTbName(pInfo, pSDB);
5
54liuyao 已提交
2026 2027
          return pSDB;
        }
2028
        blockDataCleanup(pInfo->pUpdateDataRes);
5
54liuyao 已提交
2029 2030 2031 2032
        pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
      } break;
      default:
        break;
2033
    }
2034

2035
    SStreamAggSupporter* pSup = pInfo->windowSup.pStreamAggSup;
5
54liuyao 已提交
2036
    if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
2037 2038
      pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
      pInfo->updateResIndex = 0;
5
54liuyao 已提交
2039 2040
      copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
      blockDataCleanup(pSup->pScanBlock);
2041
      prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
2042
      pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
2043
      return pInfo->pUpdateRes;
5
54liuyao 已提交
2044
    }
5
54liuyao 已提交
2045

H
Haojun Liao 已提交
2046 2047
    SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;

2048
    int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists);
2049

L
Liu Jicong 已提交
2050
  NEXT_SUBMIT_BLK:
2051
    while (1) {
L
Liu Jicong 已提交
2052
      if (pInfo->tqReader->msg2.msgStr == NULL) {
2053
        if (pInfo->validBlockIndex >= totBlockNum) {
5
54liuyao 已提交
2054
          updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
L
Liu Jicong 已提交
2055
          doClearBufferedBlocks(pInfo);
L
Liu Jicong 已提交
2056
          qDebug("stream scan return empty, consume block %d", totBlockNum);
L
liuyao 已提交
2057 2058
          // void* buff = NULL;
          // int32_t len = streamScanOperatorEncode(pInfo, &buff);
2059
          // todo(liuyao) save buff
L
liuyao 已提交
2060
          // taosMemoryFreeClear(buff);
2061 2062
          return NULL;
        }
2063

L
Liu Jicong 已提交
2064 2065
        int32_t      current = pInfo->validBlockIndex++;
        SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
2066
        if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
2067 2068 2069 2070
          qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
                 totBlockNum);
          continue;
        }
H
Haojun Liao 已提交
2071 2072
      }

2073 2074
      blockDataCleanup(pInfo->pRes);

2075
      while (tqNextDataBlock(pInfo->tqReader)) {
2076
        SSDataBlock block = {0};
2077

2078
        int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
2079 2080 2081 2082 2083

        if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
          continue;
        }

2084
        setBlockIntoRes(pInfo, &block, false);
2085

5
54liuyao 已提交
2086 2087 2088
        if (pInfo->pCreateTbRes->info.rows > 0) {
          pInfo->scanMode = STREAM_SCAN_FROM_RES;
          return pInfo->pCreateTbRes;
2089 2090
        }

5
54liuyao 已提交
2091
        doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
H
Haojun Liao 已提交
2092
        doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
2093
        pInfo->pRes->info.dataLoad = 1;
2094 2095 2096
        blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);

        if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
2097 2098 2099
          break;
        }
      }
2100
      if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
5
54liuyao 已提交
2101
        break;
J
jiacy-jcy 已提交
2102
      } else {
2103
        continue;
5
54liuyao 已提交
2104
      }
H
Haojun Liao 已提交
2105 2106 2107 2108
    }

    // record the scan action.
    pInfo->numOfExec++;
2109
    pOperator->resultInfo.totalRows += pBlockInfo->rows;
2110
    // printDataBlock(pInfo->pRes, "stream scan");
H
Haojun Liao 已提交
2111

X
Xiaoyu Wang 已提交
2112
    qDebug("scan rows: %" PRId64, pBlockInfo->rows);
L
Liu Jicong 已提交
2113 2114 2115
    if (pBlockInfo->rows > 0) {
      return pInfo->pRes;
    }
2116 2117 2118 2119 2120 2121

    if (pInfo->pUpdateDataRes->info.rows > 0) {
      goto FETCH_NEXT_BLOCK;
    }

    goto NEXT_SUBMIT_BLK;
L
Liu Jicong 已提交
2122 2123 2124
  } else {
    ASSERT(0);
    return NULL;
H
Haojun Liao 已提交
2125 2126 2127
  }
}

H
Haojun Liao 已提交
2128
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
2129 2130 2131
  SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));

  // Transfer the Array of STableKeyInfo into uid list.
H
Haojun Liao 已提交
2132 2133 2134
  size_t size = tableListGetSize(pTableListInfo);
  for (int32_t i = 0; i < size; ++i) {
    STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
2135 2136 2137 2138 2139 2140
    taosArrayPush(tableIdList, &pkeyInfo->uid);
  }

  return tableIdList;
}

2141
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
L
Liu Jicong 已提交
2142 2143
  // NOTE: this operator does never check if current status is done or not
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
2144
  SStreamRawScanInfo* pInfo = pOperator->info;
D
dapan1121 已提交
2145
  int32_t             code = TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
2146
  pTaskInfo->streamInfo.metaRsp.metaRspLen = 0;  // use metaRspLen !=0 to judge if data is meta
wmmhello's avatar
wmmhello 已提交
2147
  pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
2148

wmmhello's avatar
wmmhello 已提交
2149
  qDebug("tmqsnap doRawScan called");
2150
  if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
D
dapan1121 已提交
2151 2152 2153 2154 2155
    bool hasNext = false;
    if (pInfo->dataReader) {
      code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
      if (code) {
        tsdbReleaseDataBlock(pInfo->dataReader);
2156
        T_LONG_JMP(pTaskInfo->env, code);
D
dapan1121 已提交
2157 2158
      }
    }
X
Xiaoyu Wang 已提交
2159

D
dapan1121 已提交
2160
    if (pInfo->dataReader && hasNext) {
wmmhello's avatar
wmmhello 已提交
2161
      if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2162
        tsdbReleaseDataBlock(pInfo->dataReader);
2163
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
wmmhello's avatar
wmmhello 已提交
2164
      }
2165

H
Haojun Liao 已提交
2166 2167
      SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
      if (pBlock == NULL) {
2168
        T_LONG_JMP(pTaskInfo->env, terrno);
wmmhello's avatar
wmmhello 已提交
2169 2170
      }

H
Haojun Liao 已提交
2171
      qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid);
2172
      tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pBlock->info.id.uid, pBlock->info.window.ekey);
wmmhello's avatar
wmmhello 已提交
2173 2174
      return pBlock;
    }
wmmhello's avatar
wmmhello 已提交
2175 2176

    SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
X
Xiaoyu Wang 已提交
2177
    STqOffsetVal   offset = {0};
L
Liu Jicong 已提交
2178
    if (mtInfo.uid == 0) {  // read snapshot done, change to get data from wal
wmmhello's avatar
wmmhello 已提交
2179
      qDebug("tmqsnap read snapshot done, change to get data from wal");
2180
      tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
L
Liu Jicong 已提交
2181
    } else {
2182
      tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN);
2183
      qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid);
wmmhello's avatar
wmmhello 已提交
2184
    }
2185
    qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
2186
    tDeleteSSchemaWrapper(mtInfo.schema);
wmmhello's avatar
wmmhello 已提交
2187
    return NULL;
2188
  } else if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_META) {
L
Liu Jicong 已提交
2189 2190 2191 2192 2193 2194
    SSnapContext* sContext = pInfo->sContext;
    void*         data = NULL;
    int32_t       dataLen = 0;
    int16_t       type = 0;
    int64_t       uid = 0;
    if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
wmmhello's avatar
wmmhello 已提交
2195
      qError("tmqsnap getMetafromSnapShot error");
wmmhello's avatar
wmmhello 已提交
2196
      taosMemoryFreeClear(data);
2197 2198 2199
      return NULL;
    }

2200
    if (!sContext->queryMeta) {  // change to get data next poll request
2201 2202 2203
      STqOffsetVal offset = {0};
      tqOffsetResetToData(&offset, 0, INT64_MIN);
      qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType);
L
Liu Jicong 已提交
2204
    } else {
2205
      tqOffsetResetToMeta(&pTaskInfo->streamInfo.currentOffset, uid);
wmmhello's avatar
wmmhello 已提交
2206 2207 2208 2209
      pTaskInfo->streamInfo.metaRsp.resMsgType = type;
      pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
      pTaskInfo->streamInfo.metaRsp.metaRsp = data;
    }
2210

wmmhello's avatar
wmmhello 已提交
2211
    return NULL;
2212
  }
L
Liu Jicong 已提交
2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250
  //  else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
  //    int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
  //
  //    while(1){
  //      if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
  //        qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
  //        pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //        pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //        return NULL;
  //      }
  //      SWalCont* pHead = &pInfo->pCkHead->head;
  //      qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
  //
  //      if (pHead->msgType == TDMT_VND_SUBMIT) {
  //        SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
  //        tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
  //        SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
  //        &pInfo->pRes); if(block){
  //          pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
  //          pTaskInfo->streamInfo.lastStatus.version = fetchVer;
  //          qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //          return block;
  //        }else{
  //          fetchVer++;
  //        }
  //      } else{
  //        ASSERT(pInfo->sContext->withMeta);
  //        ASSERT(IS_META_MSG(pHead->msgType));
  //        qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
  //        pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
  //        pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
  //        pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
  //        pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
  //        memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
  //        return NULL;
  //      }
  //    }
2251 2252 2253
  return NULL;
}

wmmhello's avatar
wmmhello 已提交
2254
static void destroyRawScanOperatorInfo(void* param) {
wmmhello's avatar
wmmhello 已提交
2255 2256 2257
  SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
  tsdbReaderClose(pRawScan->dataReader);
  destroySnapContext(pRawScan->sContext);
2258
  tableListDestroy(pRawScan->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2259 2260 2261
  taosMemoryFree(pRawScan);
}

L
Liu Jicong 已提交
2262 2263 2264
// for subscribing db or stb (not including column),
// if this scan is used, meta data can be return
// and schemas are decided when scanning
2265
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo) {
L
Liu Jicong 已提交
2266 2267 2268 2269 2270
  // create operator
  // create tb reader
  // create meta reader
  // create tq reader

H
Haojun Liao 已提交
2271 2272
  int32_t code = TSDB_CODE_SUCCESS;

2273
  SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
L
Liu Jicong 已提交
2274
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2275
  if (pInfo == NULL || pOperator == NULL) {
H
Haojun Liao 已提交
2276 2277
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto _end;
2278 2279
  }

2280
  pInfo->pTableListInfo = tableListCreate();
wmmhello's avatar
wmmhello 已提交
2281 2282
  pInfo->vnode = pHandle->vnode;

2283
  pInfo->sContext = pHandle->sContext;
L
Liu Jicong 已提交
2284 2285
  setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2286

2287
  pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL);
2288
  return pOperator;
H
Haojun Liao 已提交
2289

L
Liu Jicong 已提交
2290
_end:
H
Haojun Liao 已提交
2291 2292 2293 2294
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  pTaskInfo->code = code;
  return NULL;
L
Liu Jicong 已提交
2295 2296
}

2297
static void destroyStreamScanOperatorInfo(void* param) {
2298
  SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
2299

2300
  if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
5
54liuyao 已提交
2301
    destroyOperatorInfo(pStreamScan->pTableScanOp);
2302
  }
2303

2304 2305 2306
  if (pStreamScan->tqReader) {
    tqCloseReader(pStreamScan->tqReader);
  }
H
Haojun Liao 已提交
2307 2308
  if (pStreamScan->matchInfo.pList) {
    taosArrayDestroy(pStreamScan->matchInfo.pList);
2309
  }
C
Cary Xu 已提交
2310 2311
  if (pStreamScan->pPseudoExpr) {
    destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
L
Liu Jicong 已提交
2312
    taosMemoryFree(pStreamScan->pPseudoExpr);
C
Cary Xu 已提交
2313
  }
C
Cary Xu 已提交
2314

L
Liu Jicong 已提交
2315
  cleanupExprSupp(&pStreamScan->tbnameCalSup);
5
54liuyao 已提交
2316
  cleanupExprSupp(&pStreamScan->tagCalSup);
L
Liu Jicong 已提交
2317

L
Liu Jicong 已提交
2318
  updateInfoDestroy(pStreamScan->pUpdateInfo);
2319 2320 2321 2322
  blockDataDestroy(pStreamScan->pRes);
  blockDataDestroy(pStreamScan->pUpdateRes);
  blockDataDestroy(pStreamScan->pPullDataRes);
  blockDataDestroy(pStreamScan->pDeleteDataRes);
5
54liuyao 已提交
2323
  blockDataDestroy(pStreamScan->pUpdateDataRes);
5
54liuyao 已提交
2324
  blockDataDestroy(pStreamScan->pCreateTbRes);
2325 2326 2327 2328
  taosArrayDestroy(pStreamScan->pBlockLists);
  taosMemoryFree(pStreamScan);
}

2329
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
2330
                                            STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
H
Haojun Liao 已提交
2331
  SArray*          pColIds = NULL;
2332 2333
  SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
  SOperatorInfo*   pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2334

H
Haojun Liao 已提交
2335
  if (pInfo == NULL || pOperator == NULL) {
S
Shengliang Guan 已提交
2336
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2337
    tableListDestroy(pTableListInfo);
2338
    goto _error;
H
Haojun Liao 已提交
2339 2340
  }

2341
  SScanPhysiNode*     pScanPhyNode = &pTableScanNode->scan;
2342
  SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
H
Haojun Liao 已提交
2343

2344
  pInfo->pTagCond = pTagCond;
2345
  pInfo->pGroupTags = pTableScanNode->pGroupTags;
2346

2347
  int32_t numOfCols = 0;
2348 2349
  int32_t code =
      extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
H
Haojun Liao 已提交
2350
  if (code != TSDB_CODE_SUCCESS) {
2351
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2352 2353
    goto _error;
  }
2354

H
Haojun Liao 已提交
2355
  int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList);
H
Haojun Liao 已提交
2356
  pColIds = taosArrayInit(numOfOutput, sizeof(int16_t));
2357
  for (int32_t i = 0; i < numOfOutput; ++i) {
H
Haojun Liao 已提交
2358
    SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i);
2359 2360

    int16_t colId = id->colId;
2361
    taosArrayPush(pColIds, &colId);
2362
    if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2363
      pInfo->primaryTsIndex = id->dstSlotId;
5
54liuyao 已提交
2364
    }
H
Haojun Liao 已提交
2365 2366
  }

L
Liu Jicong 已提交
2367 2368 2369 2370
  if (pTableScanNode->pSubtable != NULL) {
    SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo));
    if (pSubTableExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2371
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2372 2373
      goto _error;
    }
2374

L
Liu Jicong 已提交
2375 2376 2377
    pInfo->tbnameCalSup.pExprInfo = pSubTableExpr;
    createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0);
    if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
2378
      tableListDestroy(pTableListInfo);
L
Liu Jicong 已提交
2379 2380 2381 2382
      goto _error;
    }
  }

2383 2384
  if (pTableScanNode->pTags != NULL) {
    int32_t    numOfTags;
5
54liuyao 已提交
2385
    SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags);
2386 2387
    if (pTagExpr == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2388
      tableListDestroy(pTableListInfo);
2389 2390 2391 2392
      goto _error;
    }
    if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
2393
      tableListDestroy(pTableListInfo);
2394 2395 2396 2397
      goto _error;
    }
  }

L
Liu Jicong 已提交
2398
  pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
H
Haojun Liao 已提交
2399
  if (pInfo->pBlockLists == NULL) {
2400
    terrno = TSDB_CODE_OUT_OF_MEMORY;
2401
    tableListDestroy(pTableListInfo);
2402
    goto _error;
H
Haojun Liao 已提交
2403 2404
  }

5
54liuyao 已提交
2405
  if (pHandle->vnode) {
2406
    SOperatorInfo*  pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
L
Liu Jicong 已提交
2407
    STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
2408
    if (pHandle->version > 0) {
H
Haojun Liao 已提交
2409
      pTSInfo->base.cond.endVersion = pHandle->version;
2410
    }
L
Liu Jicong 已提交
2411

2412
    STableKeyInfo* pList = NULL;
5
54liuyao 已提交
2413
    int32_t        num = 0;
2414
    tableListGetGroupList(pTableListInfo, 0, &pList, &num);
2415

2416
    if (pHandle->initTableReader) {
L
Liu Jicong 已提交
2417
      pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
H
Haojun Liao 已提交
2418
      pTSInfo->base.dataReader = NULL;
L
Liu Jicong 已提交
2419 2420
    }

L
Liu Jicong 已提交
2421 2422 2423 2424
    if (pHandle->initTqReader) {
      ASSERT(pHandle->tqReader == NULL);
      pInfo->tqReader = tqOpenReader(pHandle->vnode);
      ASSERT(pInfo->tqReader);
2425
    } else {
L
Liu Jicong 已提交
2426 2427
      ASSERT(pHandle->tqReader);
      pInfo->tqReader = pHandle->tqReader;
2428 2429
    }

2430
    pInfo->pUpdateInfo = NULL;
2431
    pInfo->pTableScanOp = pTableScanOp;
2432 2433 2434
    if (pInfo->pTableScanOp->pTaskInfo->streamInfo.pState) {
      streamStateSetNumber(pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, -1);
    }
L
Liu Jicong 已提交
2435

L
Liu Jicong 已提交
2436 2437
    pInfo->readHandle = *pHandle;
    pInfo->tableUid = pScanPhyNode->uid;
L
Liu Jicong 已提交
2438
    pTaskInfo->streamInfo.snapshotVer = pHandle->version;
5
54liuyao 已提交
2439 2440
    pInfo->pCreateTbRes = buildCreateTableBlock(&pInfo->tbnameCalSup, &pInfo->tagCalSup);
    blockDataEnsureCapacity(pInfo->pCreateTbRes, 8);
L
Liu Jicong 已提交
2441

L
Liu Jicong 已提交
2442
    // set the extract column id to streamHandle
L
Liu Jicong 已提交
2443
    tqReaderSetColIdList(pInfo->tqReader, pColIds);
2444
    SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo);
2445
    code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
L
Liu Jicong 已提交
2446 2447 2448 2449
    if (code != 0) {
      taosArrayDestroy(tableIdList);
      goto _error;
    }
2450

L
Liu Jicong 已提交
2451
    taosArrayDestroy(tableIdList);
H
Haojun Liao 已提交
2452
    memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond));
L
Liu Jicong 已提交
2453 2454
  } else {
    taosArrayDestroy(pColIds);
2455
    tableListDestroy(pTableListInfo);
H
Haojun Liao 已提交
2456
    pColIds = NULL;
5
54liuyao 已提交
2457 2458
  }

2459 2460 2461 2462 2463
  // create the pseduo columns info
  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
    pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
  }

H
Haojun Liao 已提交
2464 2465 2466 2467 2468
  code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
2469
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2470
  pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
2471
  pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
L
Liu Jicong 已提交
2472
  pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
2473
  pInfo->groupId = 0;
2474
  pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
2475
  pInfo->pStreamScanOp = pOperator;
2476
  pInfo->deleteDataIndex = 0;
2477
  pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
5
54liuyao 已提交
2478
  pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
2479
  pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
X
Xiaoyu Wang 已提交
2480
  pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
2481
  pInfo->partitionSup.needCalc = false;
5
54liuyao 已提交
2482 2483
  pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
  pInfo->igExpired = pTableScanNode->igExpired;
2484
  pInfo->twAggSup.maxTs = INT64_MIN;
L
liuyao 已提交
2485
  pInfo->pState = NULL;
L
Liu Jicong 已提交
2486

2487 2488
  // todo(liuyao) get buff from rocks db;
  void*   buff = NULL;
L
liuyao 已提交
2489 2490 2491
  int32_t len = 0;
  streamScanOperatorDeocde(buff, len, pInfo);

L
Liu Jicong 已提交
2492 2493
  setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2494
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
H
Haojun Liao 已提交
2495

2496
  __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
L
Liu Jicong 已提交
2497 2498
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
2499

H
Haojun Liao 已提交
2500
  return pOperator;
2501

L
Liu Jicong 已提交
2502
_error:
H
Haojun Liao 已提交
2503 2504 2505 2506 2507 2508 2509 2510
  if (pColIds != NULL) {
    taosArrayDestroy(pColIds);
  }

  if (pInfo != NULL) {
    destroyStreamScanOperatorInfo(pInfo);
  }

2511 2512
  taosMemoryFreeClear(pOperator);
  return NULL;
H
Haojun Liao 已提交
2513 2514
}

2515
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
H
Haojun Liao 已提交
2516 2517 2518 2519
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

2520 2521 2522
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;

  STagScanInfo* pInfo = pOperator->info;
2523
  SExprInfo*    pExprInfo = &pOperator->exprSupp.pExprInfo[0];
2524
  SSDataBlock*  pRes = pInfo->pRes;
2525
  blockDataCleanup(pRes);
H
Haojun Liao 已提交
2526

2527
  int32_t size = tableListGetSize(pInfo->pTableListInfo);
wmmhello's avatar
wmmhello 已提交
2528
  if (size == 0) {
H
Haojun Liao 已提交
2529 2530 2531 2532
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
    return NULL;
  }

2533 2534 2535
  char        str[512] = {0};
  int32_t     count = 0;
  SMetaReader mr = {0};
2536
  metaReaderInit(&mr, pInfo->readHandle.meta, 0);
H
Haojun Liao 已提交
2537

wmmhello's avatar
wmmhello 已提交
2538
  while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
2539
    STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
L
Liu Jicong 已提交
2540
    int32_t        code = metaGetTableEntryByUid(&mr, item->uid);
2541
    tDecoderClear(&mr.coder);
H
Haojun Liao 已提交
2542
    if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
2543 2544
      qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
             GET_TASKID(pTaskInfo));
H
Haojun Liao 已提交
2545
      metaReaderClear(&mr);
2546
      T_LONG_JMP(pTaskInfo->env, terrno);
H
Haojun Liao 已提交
2547
    }
H
Haojun Liao 已提交
2548

2549
    for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
2550 2551 2552 2553 2554
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);

      // refactor later
      if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
        STR_TO_VARSTR(str, mr.me.name);
2555
        colDataSetVal(pDst, count, str, false);
2556
      } else {  // it is a tag value
wmmhello's avatar
wmmhello 已提交
2557 2558
        STagVal val = {0};
        val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
2559
        const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val);
wmmhello's avatar
wmmhello 已提交
2560

2561 2562 2563 2564
        char* data = NULL;
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
          data = tTagValToData((const STagVal*)p, false);
        } else {
wmmhello's avatar
wmmhello 已提交
2565 2566
          data = (char*)p;
        }
2567
        colDataSetVal(pDst, count, data,
L
Liu Jicong 已提交
2568
                      (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
2569

2570 2571
        if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
            data != NULL) {
wmmhello's avatar
wmmhello 已提交
2572
          taosMemoryFree(data);
wmmhello's avatar
wmmhello 已提交
2573
        }
H
Haojun Liao 已提交
2574 2575 2576
      }
    }

2577
    count += 1;
wmmhello's avatar
wmmhello 已提交
2578
    if (++pInfo->curPos >= size) {
H
Haojun Liao 已提交
2579
      setOperatorCompleted(pOperator);
H
Haojun Liao 已提交
2580 2581 2582
    }
  }

2583 2584
  metaReaderClear(&mr);

2585
  // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
H
Haojun Liao 已提交
2586
  if (pOperator->status == OP_EXEC_DONE) {
2587
    setTaskStatus(pTaskInfo, TASK_COMPLETED);
H
Haojun Liao 已提交
2588 2589 2590
  }

  pRes->info.rows = count;
wmmhello's avatar
wmmhello 已提交
2591
  pOperator->resultInfo.totalRows += count;
2592

2593
  return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
H
Haojun Liao 已提交
2594 2595
}

2596
static void destroyTagScanOperatorInfo(void* param) {
H
Haojun Liao 已提交
2597 2598
  STagScanInfo* pInfo = (STagScanInfo*)param;
  pInfo->pRes = blockDataDestroy(pInfo->pRes);
H
Haojun Liao 已提交
2599
  taosArrayDestroy(pInfo->matchInfo.pList);
2600
  pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
D
dapan1121 已提交
2601
  taosMemoryFreeClear(param);
H
Haojun Liao 已提交
2602 2603
}

S
slzhou 已提交
2604
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
X
Xiaoyu Wang 已提交
2605
                                         STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
2606
  STagScanInfo*  pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
H
Haojun Liao 已提交
2607 2608 2609 2610 2611
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }

2612 2613 2614 2615
  SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;

  int32_t    numOfExprs = 0;
  SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
2616
  int32_t    code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
2617 2618 2619
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2620

H
Haojun Liao 已提交
2621 2622
  int32_t num = 0;
  code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo);
2623 2624 2625
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
2626

2627
  pInfo->pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
2628
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
2629 2630
  pInfo->readHandle = *pReadHandle;
  pInfo->curPos = 0;
2631

L
Liu Jicong 已提交
2632 2633
  setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
                  pTaskInfo);
2634
  initResultSizeInfo(&pOperator->resultInfo, 4096);
2635 2636
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

L
Liu Jicong 已提交
2637 2638
  pOperator->fpSet =
      createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
H
Haojun Liao 已提交
2639 2640

  return pOperator;
2641

2642
_error:
H
Haojun Liao 已提交
2643 2644 2645 2646 2647
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  terrno = TSDB_CODE_OUT_OF_MEMORY;
  return NULL;
}
2648

dengyihao's avatar
dengyihao 已提交
2649
static SSDataBlock* getTableDataBlockImpl(void* param) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2650 2651 2652 2653 2654 2655
  STableMergeScanSortSourceParam* source = param;
  SOperatorInfo*                  pOperator = source->pOperator;
  STableMergeScanInfo*            pInfo = pOperator->info;
  SExecTaskInfo*                  pTaskInfo = pOperator->pTaskInfo;
  int32_t                         readIdx = source->readerIdx;
  SSDataBlock*                    pBlock = source->inputBlock;
D
dapan1121 已提交
2656
  int32_t                         code = 0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2657

H
Haojun Liao 已提交
2658
  SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
dengyihao's avatar
opt mem  
dengyihao 已提交
2659

L
Liu Jicong 已提交
2660
  int64_t      st = taosGetTimestampUs();
2661
  void*        p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
H
Haojun Liao 已提交
2662
  SReadHandle* pHandle = &pInfo->base.readHandle;
dengyihao's avatar
dengyihao 已提交
2663

D
dapan1121 已提交
2664
  if (NULL == source->dataReader || !source->multiReader) {
D
dapan1121 已提交
2665
    code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
D
dapan1121 已提交
2666 2667 2668
    if (code != 0) {
      T_LONG_JMP(pTaskInfo->env, code);
    }
dengyihao's avatar
dengyihao 已提交
2669
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2670

D
dapan1121 已提交
2671
  pInfo->base.dataReader = source->dataReader;
H
Haojun Liao 已提交
2672
  STsdbReader* reader = pInfo->base.dataReader;
X
Xiaoyu Wang 已提交
2673
  bool         hasNext = false;
2674
  qTrace("tsdb/read-table-data: %p, enter next reader", reader);
D
dapan1121 已提交
2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686

  while (true) {
    code = tsdbNextDataBlock(reader, &hasNext);
    if (code != 0) {
      tsdbReleaseDataBlock(reader);
      pInfo->base.dataReader = NULL;
      T_LONG_JMP(pTaskInfo->env, code);
    }

    if (!hasNext) {
      break;
    }
X
Xiaoyu Wang 已提交
2687

H
Haojun Liao 已提交
2688
    if (isTaskKilled(pTaskInfo)) {
X
Xiaoyu Wang 已提交
2689
      tsdbReleaseDataBlock(reader);
D
dapan1121 已提交
2690
      pInfo->base.dataReader = NULL;
2691
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2692 2693 2694
    }

    // process this data block based on the probabilities
H
Haojun Liao 已提交
2695
    bool processThisBlock = processBlockWithProbability(&pInfo->sample);
dengyihao's avatar
opt mem  
dengyihao 已提交
2696 2697 2698 2699
    if (!processThisBlock) {
      continue;
    }

H
Haojun Liao 已提交
2700
    if (pQueryCond->order == TSDB_ORDER_ASC) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2701 2702 2703 2704
      pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
    } else {
      pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
    }
dengyihao's avatar
opt mem  
dengyihao 已提交
2705 2706

    uint32_t status = 0;
2707
    code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
S
slzhou 已提交
2708
    //    code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
dengyihao's avatar
opt mem  
dengyihao 已提交
2709
    if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
2710
      T_LONG_JMP(pTaskInfo->env, code);
dengyihao's avatar
opt mem  
dengyihao 已提交
2711 2712 2713 2714 2715 2716 2717
    }

    // current block is filter out according to filter condition, continue load the next block
    if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
      continue;
    }

2718
    pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
dengyihao's avatar
opt mem  
dengyihao 已提交
2719

H
Haojun Liao 已提交
2720
    pOperator->resultInfo.totalRows += pBlock->info.rows;
H
Haojun Liao 已提交
2721
    pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
dengyihao's avatar
opt mem  
dengyihao 已提交
2722

2723
    qTrace("tsdb/read-table-data: %p, close reader", reader);
D
dapan1121 已提交
2724 2725 2726 2727
    if (!source->multiReader) {
      tsdbReaderClose(pInfo->base.dataReader);
      source->dataReader = NULL;
    }
H
Haojun Liao 已提交
2728
    pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2729 2730
    return pBlock;
  }
H
Haojun Liao 已提交
2731

D
dapan1121 已提交
2732 2733 2734 2735
  if (!source->multiReader) {
    tsdbReaderClose(pInfo->base.dataReader);
    source->dataReader = NULL;
  }
H
Haojun Liao 已提交
2736
  pInfo->base.dataReader = NULL;
dengyihao's avatar
opt mem  
dengyihao 已提交
2737 2738 2739
  return NULL;
}

2740 2741 2742
SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
  int32_t tsTargetSlotId = 0;
  for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) {
H
Haojun Liao 已提交
2743
    SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i);
2744
    if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
H
Haojun Liao 已提交
2745
      tsTargetSlotId = colInfo->dstSlotId;
2746 2747 2748
    }
  }

2749 2750 2751
  SArray*         pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
  SBlockOrderInfo bi = {0};
  bi.order = order;
2752
  bi.slotId = tsTargetSlotId;
2753 2754 2755 2756 2757 2758 2759
  bi.nullFirst = NULL_ORDER_FIRST;

  taosArrayPush(pList, &bi);

  return pList;
}

H
Haojun Liao 已提交
2760
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
dengyihao's avatar
opt mem  
dengyihao 已提交
2761 2762 2763 2764 2765 2766 2767
  memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
  dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
  for (int i = 0; i < src->numOfCols; i++) {
    dst->colList[i] = src->colList[i];
  }
  return 0;
}
H
Haojun Liao 已提交
2768

2769
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
2770 2771 2772
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

S
slzhou 已提交
2773
  {
2774
    size_t  numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2775
    int32_t i = pInfo->tableStartIndex + 1;
H
Haojun Liao 已提交
2776
    for (; i < numOfTables; ++i) {
2777
      STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
S
slzhou 已提交
2778 2779 2780 2781 2782 2783
      if (tableKeyInfo->groupId != pInfo->groupId) {
        break;
      }
    }
    pInfo->tableEndIndex = i - 1;
  }
2784

S
slzhou 已提交
2785 2786
  int32_t tableStartIdx = pInfo->tableStartIndex;
  int32_t tableEndIdx = pInfo->tableEndIndex;
2787

H
Haojun Liao 已提交
2788
  pInfo->base.dataReader = NULL;
2789

2790 2791
  // todo the total available buffer should be determined by total capacity of buffer of this task.
  // the additional one is reserved for merge result
S
slzhou 已提交
2792
  pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
2793
  int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
L
Liu Jicong 已提交
2794 2795
  pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
                                             pInfo->pSortInputBlock, pTaskInfo->id.str);
2796

dengyihao's avatar
dengyihao 已提交
2797
  tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
dengyihao's avatar
opt mem  
dengyihao 已提交
2798 2799 2800 2801 2802 2803

  // one table has one data block
  int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
  pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));

  for (int32_t i = 0; i < numOfTable; ++i) {
2804 2805 2806
    STableMergeScanSortSourceParam param = {0};
    param.readerIdx = i;
    param.pOperator = pOperator;
D
dapan1121 已提交
2807
    param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
2808
    param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
H
Haojun Liao 已提交
2809 2810
    blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);

2811
    taosArrayPush(pInfo->sortSourceParams, &param);
dengyihao's avatar
opt mem  
dengyihao 已提交
2812 2813

    SQueryTableDataCond cond;
H
Haojun Liao 已提交
2814
    dumpQueryTableCond(&pInfo->base.cond, &cond);
dengyihao's avatar
opt mem  
dengyihao 已提交
2815
    taosArrayPush(pInfo->queryConds, &cond);
2816 2817
  }

dengyihao's avatar
opt mem  
dengyihao 已提交
2818
  for (int32_t i = 0; i < numOfTable; ++i) {
2819
    SSortSource*                    ps = taosMemoryCalloc(1, sizeof(SSortSource));
2820
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
2821
    ps->param = param;
2822
    ps->onlyRef = true;
2823 2824 2825 2826 2827 2828
    tsortAddSource(pInfo->pSortHandle, ps);
  }

  int32_t code = tsortOpen(pInfo->pSortHandle);

  if (code != TSDB_CODE_SUCCESS) {
2829
    T_LONG_JMP(pTaskInfo->env, terrno);
2830 2831
  }

2832 2833 2834 2835 2836 2837 2838
  return TSDB_CODE_SUCCESS;
}

int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

dengyihao's avatar
dengyihao 已提交
2839
  int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
2840

2841 2842 2843 2844 2845 2846 2847
  SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
  pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
  pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
  pInfo->sortExecInfo.loops += sortExecInfo.loops;
  pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
  pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;

dengyihao's avatar
dengyihao 已提交
2848
  for (int32_t i = 0; i < numOfTable; ++i) {
2849 2850
    STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
    blockDataDestroy(param->inputBlock);
D
dapan1121 已提交
2851 2852
    tsdbReaderClose(param->dataReader);
    param->dataReader = NULL;
2853
  }
2854 2855
  taosArrayClear(pInfo->sortSourceParams);

2856
  tsortDestroySortHandle(pInfo->pSortHandle);
dengyihao's avatar
dengyihao 已提交
2857
  pInfo->pSortHandle = NULL;
2858

dengyihao's avatar
opt mem  
dengyihao 已提交
2859 2860 2861
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
    SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
    taosMemoryFree(cond->colList);
2862
  }
dengyihao's avatar
opt mem  
dengyihao 已提交
2863 2864 2865
  taosArrayDestroy(pInfo->queryConds);
  pInfo->queryConds = NULL;

2866
  resetLimitInfoForNextGroup(&pInfo->limitInfo);
2867 2868 2869
  return TSDB_CODE_SUCCESS;
}

2870 2871
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
L
Liu Jicong 已提交
2872 2873
SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity,
                                              SOperatorInfo* pOperator) {
2874 2875 2876
  STableMergeScanInfo* pInfo = pOperator->info;
  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;

2877
  blockDataCleanup(pResBlock);
2878 2879

  while (1) {
2880
    STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
2881 2882 2883 2884
    if (pTupleHandle == NULL) {
      break;
    }

2885 2886
    appendOneRowToDataBlock(pResBlock, pTupleHandle);
    if (pResBlock->info.rows >= capacity) {
2887 2888 2889 2890
      break;
    }
  }

2891
  bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
D
dapan1121 已提交
2892
  qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
2893
         pInfo->limitInfo.numOfOutputRows);
2894

2895
  return (pResBlock->info.rows > 0) ? pResBlock : NULL;
2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907
}

SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }

  SExecTaskInfo*       pTaskInfo = pOperator->pTaskInfo;
  STableMergeScanInfo* pInfo = pOperator->info;

  int32_t code = pOperator->fpSet._openFn(pOperator);
  if (code != TSDB_CODE_SUCCESS) {
2908
    T_LONG_JMP(pTaskInfo->env, code);
2909
  }
2910

2911
  size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
S
slzhou 已提交
2912 2913
  if (!pInfo->hasGroupId) {
    pInfo->hasGroupId = true;
2914

S
slzhou 已提交
2915
    if (tableListSize == 0) {
H
Haojun Liao 已提交
2916
      setOperatorCompleted(pOperator);
2917 2918
      return NULL;
    }
S
slzhou 已提交
2919
    pInfo->tableStartIndex = 0;
2920
    pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
2921 2922
    startGroupTableMergeScan(pOperator);
  }
2923

S
slzhou 已提交
2924 2925
  SSDataBlock* pBlock = NULL;
  while (pInfo->tableStartIndex < tableListSize) {
2926 2927 2928 2929
    if (isTaskKilled(pTaskInfo)) {
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
    }

L
Liu Jicong 已提交
2930 2931
    pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity,
                                              pOperator);
S
slzhou 已提交
2932
    if (pBlock != NULL) {
H
Haojun Liao 已提交
2933
      pBlock->info.id.groupId = pInfo->groupId;
S
slzhou 已提交
2934 2935 2936
      pOperator->resultInfo.totalRows += pBlock->info.rows;
      return pBlock;
    } else {
2937
      // Data of this group are all dumped, let's try the next group
S
slzhou 已提交
2938 2939
      stopGroupTableMergeScan(pOperator);
      if (pInfo->tableEndIndex >= tableListSize - 1) {
H
Haojun Liao 已提交
2940
        setOperatorCompleted(pOperator);
S
slzhou 已提交
2941 2942
        break;
      }
2943

S
slzhou 已提交
2944
      pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
2945
      pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
S
slzhou 已提交
2946
      startGroupTableMergeScan(pOperator);
X
Xiaoyu Wang 已提交
2947
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
S
slzhou 已提交
2948
    }
wmmhello's avatar
wmmhello 已提交
2949 2950
  }

2951 2952 2953
  return pBlock;
}

2954
void destroyTableMergeScanOperatorInfo(void* param) {
2955
  STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
H
Haojun Liao 已提交
2956
  cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
2957

dengyihao's avatar
dengyihao 已提交
2958 2959 2960
  int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds);

  for (int32_t i = 0; i < numOfTable; i++) {
H
Haojun Liao 已提交
2961 2962
    STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
    blockDataDestroy(p->inputBlock);
D
dapan1121 已提交
2963 2964
    tsdbReaderClose(p->dataReader);
    p->dataReader = NULL;
2965
  }
H
Haojun Liao 已提交
2966

D
dapan1121 已提交
2967 2968 2969
  tsdbReaderClose(pTableScanInfo->base.dataReader);
  pTableScanInfo->base.dataReader = NULL;

2970
  taosArrayDestroy(pTableScanInfo->sortSourceParams);
dengyihao's avatar
dengyihao 已提交
2971 2972
  tsortDestroySortHandle(pTableScanInfo->pSortHandle);
  pTableScanInfo->pSortHandle = NULL;
2973

dengyihao's avatar
opt mem  
dengyihao 已提交
2974 2975 2976
  for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
    SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
    taosMemoryFree(pCond->colList);
2977 2978
  }

2979 2980
  taosArrayDestroy(pTableScanInfo->queryConds);
  destroyTableScanBase(&pTableScanInfo->base);
2981 2982 2983 2984 2985

  pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
  pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);

  taosArrayDestroy(pTableScanInfo->pSortInfo);
D
dapan1121 已提交
2986
  taosMemoryFreeClear(param);
2987 2988 2989 2990
}

int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
  ASSERT(pOptr != NULL);
2991 2992
  // TODO: merge these two info into one struct
  STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
L
Liu Jicong 已提交
2993
  STableMergeScanInfo*     pInfo = pOptr->info;
H
Haojun Liao 已提交
2994
  execInfo->blockRecorder = pInfo->base.readRecorder;
2995
  execInfo->sortExecInfo = pInfo->sortExecInfo;
2996 2997 2998

  *pOptrExplain = execInfo;
  *len = sizeof(STableMergeScanExecInfo);
L
Liu Jicong 已提交
2999

3000 3001 3002
  return TSDB_CODE_SUCCESS;
}

H
Haojun Liao 已提交
3003
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
3004
                                                STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
3005 3006 3007 3008 3009
  STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
  SOperatorInfo*       pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
  if (pInfo == NULL || pOperator == NULL) {
    goto _error;
  }
3010

3011 3012 3013
  SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;

  int32_t numOfCols = 0;
3014
  int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID,
H
Haojun Liao 已提交
3015
                                     &pInfo->base.matchInfo);
H
Haojun Liao 已提交
3016 3017 3018
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }
3019

H
Haojun Liao 已提交
3020
  code = initQueryTableDataCond(&pInfo->base.cond, pTableScanNode);
3021
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
3022
    taosArrayDestroy(pInfo->base.matchInfo.pList);
3023 3024 3025 3026
    goto _error;
  }

  if (pTableScanNode->scan.pScanPseudoCols != NULL) {
H
Haojun Liao 已提交
3027
    SExprSupp* pSup = &pInfo->base.pseudoSup;
3028 3029
    pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
    pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
3030 3031 3032 3033
  }

  pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};

H
Haojun Liao 已提交
3034 3035 3036 3037 3038 3039
  pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
  if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) {
    code = terrno;
    goto _error;
  }

H
Haojun Liao 已提交
3040 3041
  pInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD;
  pInfo->base.scanFlag = MAIN_SCAN;
H
Haojun Liao 已提交
3042
  pInfo->base.readHandle = *readHandle;
3043 3044 3045

  pInfo->base.limitInfo.limit.limit = -1;
  pInfo->base.limitInfo.slimit.limit = -1;
3046
  pInfo->base.pTableListInfo = pTableListInfo;
H
Haojun Liao 已提交
3047

3048
  pInfo->sample.sampleRatio = pTableScanNode->ratio;
L
Liu Jicong 已提交
3049
  pInfo->sample.seed = taosGetTimestampSec();
H
Haojun Liao 已提交
3050 3051 3052 3053 3054 3055

  code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
  if (code != TSDB_CODE_SUCCESS) {
    goto _error;
  }

H
Haojun Liao 已提交
3056
  initResultSizeInfo(&pOperator->resultInfo, 1024);
H
Haojun Liao 已提交
3057
  pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
H
Haojun Liao 已提交
3058 3059
  blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);

3060
  pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
3061

H
Haojun Liao 已提交
3062
  pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
3063
  pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
3064
  initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
3065

dengyihao's avatar
dengyihao 已提交
3066
  int32_t  rowSize = pInfo->pResBlock->info.rowSize;
A
Alex Duan 已提交
3067 3068
  uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
  pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
3069

L
Liu Jicong 已提交
3070 3071
  setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3072
  pOperator->exprSupp.numOfExprs = numOfCols;
3073

3074 3075
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
                                         optrDefaultBufFn, getTableMergeScanExplainExecInfo);
3076 3077 3078 3079 3080 3081 3082 3083 3084
  pOperator->cost.openCost = 0;
  return pOperator;

_error:
  pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
  taosMemoryFree(pInfo);
  taosMemoryFree(pOperator);
  return NULL;
}
S
shenglian zhou 已提交
3085 3086 3087 3088

// ====================================================================================================================
// TableCountScanOperator
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator);
S
slzhou 已提交
3089
static void         destoryTableCountScanOperator(void* param);
S
slzhou 已提交
3090 3091 3092 3093 3094 3095
static void         buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName, tb_uid_t stbUid);
static void         buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                                   SSDataBlock* pRes, char* dbName);
static void         buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                              STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName);
L
Liu Jicong 已提交
3096 3097
static void         buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName);
S
slzhou 已提交
3098 3099 3100 3101 3102 3103 3104
static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes);
static void         buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                                STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                                size_t perfdbTableNum);
static void         buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                               size_t infodbTableNum, size_t perfdbTableNum);
S
slzhou 已提交
3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165
static const char*  GROUP_TAG_DB_NAME = "db_name";
static const char*  GROUP_TAG_STABLE_NAME = "stable_name";

int32_t tblCountScanGetGroupTagsSlotId(const SNodeList* scanCols, STableCountScanSupp* supp) {
  if (scanCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, scanCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)(targetNode->pExpr);
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->dbNameSlotId = targetNode->slotId;
      } else if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->stbNameSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetCountSlotId(const SNodeList* pseudoCols, STableCountScanSupp* supp) {
  if (pseudoCols != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, pseudoCols) {
      if (nodeType(pNode) != QUERY_NODE_TARGET) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      STargetNode* targetNode = (STargetNode*)pNode;
      if (nodeType(targetNode->pExpr) != QUERY_NODE_FUNCTION) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SFunctionNode* funcNode = (SFunctionNode*)(targetNode->pExpr);
      if (funcNode->funcType == FUNCTION_TYPE_TABLE_COUNT) {
        supp->tbCountSlotId = targetNode->slotId;
      }
    }
  }
  return TSDB_CODE_SUCCESS;
}

int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCountScanSupp* supp) {
  if (groupTags != NULL) {
    SNode* pNode = NULL;
    FOREACH(pNode, groupTags) {
      if (nodeType(pNode) != QUERY_NODE_COLUMN) {
        return TSDB_CODE_QRY_SYS_ERROR;
      }
      SColumnNode* colNode = (SColumnNode*)pNode;
      if (strcmp(colNode->colName, GROUP_TAG_DB_NAME) == 0) {
        supp->groupByDbName = true;
      }
      if (strcmp(colNode->colName, GROUP_TAG_STABLE_NAME) == 0) {
        supp->groupByStbName = true;
      }
    }
  } else {
H
Haojun Liao 已提交
3166 3167
    tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN);
    tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN);
S
slzhou 已提交
3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195
  }
  return TSDB_CODE_SUCCESS;
}

int32_t getTableCountScanSupp(SNodeList* groupTags, SName* tableName, SNodeList* scanCols, SNodeList* pseudoCols,
                              STableCountScanSupp* supp, SExecTaskInfo* taskInfo) {
  int32_t code = 0;
  code = tblCountScanGetInputs(groupTags, tableName, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get inputs error", GET_TASKID(taskInfo));
    return code;
  }
  supp->dbNameSlotId = -1;
  supp->stbNameSlotId = -1;
  supp->tbCountSlotId = -1;

  code = tblCountScanGetGroupTagsSlotId(scanCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get group tags slot id error", GET_TASKID(taskInfo));
    return code;
  }
  code = tblCountScanGetCountSlotId(pseudoCols, supp);
  if (code != TSDB_CODE_SUCCESS) {
    qError("%s get table count scan supp. get count error", GET_TASKID(taskInfo));
    return code;
  }
  return code;
}
S
shenglian zhou 已提交
3196

S
slzhou 已提交
3197
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableCountScanPhysiNode* pTblCountScanNode,
S
shenglian zhou 已提交
3198 3199 3200
                                                SExecTaskInfo* pTaskInfo) {
  int32_t code = TSDB_CODE_SUCCESS;

S
slzhou 已提交
3201
  SScanPhysiNode*              pScanNode = &pTblCountScanNode->scan;
S
slzhou 已提交
3202
  STableCountScanOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableCountScanOperatorInfo));
S
slzhou 已提交
3203
  SOperatorInfo*               pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
S
shenglian zhou 已提交
3204 3205 3206 3207 3208 3209 3210 3211 3212

  if (!pInfo || !pOperator) {
    goto _error;
  }

  pInfo->readHandle = *readHandle;

  SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
  initResultSizeInfo(&pOperator->resultInfo, 1);
3213
  pInfo->pRes = createDataBlockFromDescNode(pDescNode);
S
shenglian zhou 已提交
3214 3215
  blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

S
slzhou 已提交
3216 3217 3218
  getTableCountScanSupp(pTblCountScanNode->pGroupTags, &pTblCountScanNode->scan.tableName,
                        pTblCountScanNode->scan.pScanCols, pTblCountScanNode->scan.pScanPseudoCols, &pInfo->supp,
                        pTaskInfo);
S
shenglian zhou 已提交
3219 3220 3221

  setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
                  pInfo, pTaskInfo);
L
Liu Jicong 已提交
3222 3223
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
                                         optrDefaultBufFn, NULL);
S
shenglian zhou 已提交
3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234
  return pOperator;

_error:
  if (pInfo != NULL) {
    destoryTableCountScanOperator(pInfo);
  }
  taosMemoryFreeClear(pOperator);
  pTaskInfo->code = code;
  return NULL;
}

S
slzhou 已提交
3235 3236 3237
void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char* stbName, int64_t count,
                                 SSDataBlock* pRes) {
  if (pSupp->dbNameSlotId != -1) {
3238
    ASSERT(strlen(dbName));
S
slzhou 已提交
3239
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
H
Haojun Liao 已提交
3240 3241 3242 3243

    char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
    tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);

S
slzhou 已提交
3244
    varDataSetLen(varDbName, strlen(dbName));
3245
    colDataSetVal(colInfoData, 0, varDbName, false);
S
slzhou 已提交
3246 3247 3248 3249
  }

  if (pSupp->stbNameSlotId != -1) {
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
3250
    if (strlen(stbName) != 0) {
S
slzhou 已提交
3251
      char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
H
Haojun Liao 已提交
3252
      strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
3253
      varDataSetLen(varStbName, strlen(stbName));
3254
      colDataSetVal(colInfoData, 0, varStbName, false);
3255
    } else {
3256
      colDataSetNULL(colInfoData, 0);
3257
    }
S
slzhou 已提交
3258 3259 3260
  }

  if (pSupp->tbCountSlotId != -1) {
S
slzhou 已提交
3261
    SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->tbCountSlotId);
3262
    colDataSetVal(colInfoData, 0, (char*)&count, false);
S
slzhou 已提交
3263 3264 3265 3266
  }
  pRes->info.rows = 1;
}

S
slzhou 已提交
3267
static SSDataBlock* buildSysDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo) {
S
slzhou 已提交
3268 3269 3270
  STableCountScanSupp* pSupp = &pInfo->supp;
  SSDataBlock*         pRes = pInfo->pRes;

S
slzhou 已提交
3271
  size_t infodbTableNum;
S
slzhou 已提交
3272
  getInfosDbMeta(NULL, &infodbTableNum);
S
slzhou 已提交
3273
  size_t perfdbTableNum;
S
slzhou 已提交
3274 3275
  getPerfDbMeta(NULL, &perfdbTableNum);

D
dapan1121 已提交
3276
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3277
    buildSysDbGroupedTableCount(pOperator, pInfo, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3278 3279
    return (pRes->info.rows > 0) ? pRes : NULL;
  } else {
S
slzhou 已提交
3280
    buildSysDbFilterTableCount(pOperator, pSupp, pRes, infodbTableNum, perfdbTableNum);
S
slzhou 已提交
3281 3282 3283 3284
    return (pRes->info.rows > 0) ? pRes : NULL;
  }
}

S
slzhou 已提交
3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300
static void buildSysDbFilterTableCount(SOperatorInfo* pOperator, STableCountScanSupp* pSupp, SSDataBlock* pRes,
                                       size_t infodbTableNum, size_t perfdbTableNum) {
  if (strcmp(pSupp->dbNameFilter, TSDB_INFORMATION_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (strcmp(pSupp->dbNameFilter, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else if (strlen(pSupp->dbNameFilter) == 0) {
    fillTableCountScanDataBlock(pSupp, "", "", infodbTableNum + perfdbTableNum, pRes);
  }
  setOperatorCompleted(pOperator);
}

static void buildSysDbGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, size_t infodbTableNum,
                                        size_t perfdbTableNum) {
  if (pInfo->currGrpIdx == 0) {
D
dapan1121 已提交
3301 3302 3303 3304 3305 3306
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_INFORMATION_SCHEMA_DB, strlen(TSDB_INFORMATION_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }
X
Xiaoyu Wang 已提交
3307

S
slzhou 已提交
3308 3309 3310
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_INFORMATION_SCHEMA_DB, "", infodbTableNum, pRes);
  } else if (pInfo->currGrpIdx == 1) {
D
dapan1121 已提交
3311 3312 3313 3314 3315 3316 3317
    uint64_t groupId = 0;
    if (pSupp->groupByDbName) {
      groupId = calcGroupId(TSDB_PERFORMANCE_SCHEMA_DB, strlen(TSDB_PERFORMANCE_SCHEMA_DB));
    } else {
      groupId = calcGroupId("", 0);
    }

S
slzhou 已提交
3318 3319 3320 3321 3322 3323 3324 3325
    pRes->info.id.groupId = groupId;
    fillTableCountScanDataBlock(pSupp, TSDB_PERFORMANCE_SCHEMA_DB, "", perfdbTableNum, pRes);
  } else {
    setOperatorCompleted(pOperator);
  }
  pInfo->currGrpIdx++;
}

S
shenglian zhou 已提交
3326
static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator) {
S
slzhou 已提交
3327 3328 3329 3330
  SExecTaskInfo*               pTaskInfo = pOperator->pTaskInfo;
  STableCountScanOperatorInfo* pInfo = pOperator->info;
  STableCountScanSupp*         pSupp = &pInfo->supp;
  SSDataBlock*                 pRes = pInfo->pRes;
S
slzhou 已提交
3331
  blockDataCleanup(pRes);
3332

S
slzhou 已提交
3333 3334 3335
  if (pOperator->status == OP_EXEC_DONE) {
    return NULL;
  }
S
slzhou 已提交
3336
  if (pInfo->readHandle.mnd != NULL) {
S
slzhou 已提交
3337
    return buildSysDbTableCount(pOperator, pInfo);
S
slzhou 已提交
3338
  }
S
slzhou 已提交
3339

S
slzhou 已提交
3340 3341 3342 3343 3344
  return buildVnodeDbTableCount(pOperator, pInfo, pSupp, pRes);
}

static SSDataBlock* buildVnodeDbTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                           STableCountScanSupp* pSupp, SSDataBlock* pRes) {
S
slzhou 已提交
3345 3346
  const char* db = NULL;
  int32_t     vgId = 0;
S
slzhou 已提交
3347
  char        dbName[TSDB_DB_NAME_LEN] = {0};
S
slzhou 已提交
3348

S
slzhou 已提交
3349 3350 3351 3352 3353 3354
  // get dbname
  vnodeGetInfo(pInfo->readHandle.vnode, &db, &vgId);
  SName sn = {0};
  tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB);
  tNameGetDbName(&sn, dbName);

D
dapan1121 已提交
3355
  if (pSupp->groupByDbName || pSupp->groupByStbName) {
S
slzhou 已提交
3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369
    buildVnodeGroupedTableCount(pOperator, pInfo, pSupp, pRes, vgId, dbName);
  } else {
    buildVnodeFilteredTbCount(pOperator, pInfo, pSupp, pRes, dbName);
  }
  return pRes->info.rows > 0 ? pRes : NULL;
}

static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                        STableCountScanSupp* pSupp, SSDataBlock* pRes, int32_t vgId, char* dbName) {
  if (pSupp->groupByStbName) {
    if (pInfo->stbUidList == NULL) {
      pInfo->stbUidList = taosArrayInit(16, sizeof(tb_uid_t));
      if (vnodeGetStbIdList(pInfo->readHandle.vnode, 0, pInfo->stbUidList) < 0) {
        qError("vgId:%d, failed to get stb id list error: %s", vgId, terrstr());
S
slzhou 已提交
3370
      }
S
slzhou 已提交
3371 3372 3373 3374 3375 3376 3377 3378 3379 3380
    }
    if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) {
      tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx);
      buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid);

      pInfo->currGrpIdx++;
    } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) {
      buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName);

      pInfo->currGrpIdx++;
S
slzhou 已提交
3381
    } else {
S
slzhou 已提交
3382
      setOperatorCompleted(pOperator);
S
slzhou 已提交
3383 3384
    }
  } else {
S
slzhou 已提交
3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401
    uint64_t groupId = calcGroupId(dbName, strlen(dbName));
    pRes->info.id.groupId = groupId;
    int64_t dbTableCount = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", dbTableCount, pRes);
    setOperatorCompleted(pOperator);
  }
}

static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo,
                                      STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName) {
  if (strlen(pSupp->dbNameFilter) != 0) {
    if (strlen(pSupp->stbNameFilter) != 0) {
      tb_uid_t      uid = metaGetTableEntryUidByName(pInfo->readHandle.meta, pSupp->stbNameFilter);
      SMetaStbStats stats = {0};
      metaGetStbStats(pInfo->readHandle.meta, uid, &stats);
      int64_t ctbNum = stats.ctbNum;
      fillTableCountScanDataBlock(pSupp, dbName, pSupp->stbNameFilter, ctbNum, pRes);
S
slzhou 已提交
3402 3403 3404
    } else {
      int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
      fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3405
    }
S
slzhou 已提交
3406 3407 3408
  } else {
    int64_t tbNumVnode = metaGetTbNum(pInfo->readHandle.meta);
    fillTableCountScanDataBlock(pSupp, dbName, "", tbNumVnode, pRes);
S
slzhou 已提交
3409
  }
S
slzhou 已提交
3410 3411 3412 3413 3414 3415
  setOperatorCompleted(pOperator);
}

static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName) {
  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3416 3417 3418
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, "");
  }
X
Xiaoyu Wang 已提交
3419

S
slzhou 已提交
3420 3421 3422
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;
  int64_t ntbNum = metaGetNtbNum(pInfo->readHandle.meta);
3423 3424 3425
  if (ntbNum != 0) {
    fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes);
  }
S
slzhou 已提交
3426 3427 3428 3429 3430 3431 3432 3433
}

static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp,
                                           SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) {
  char stbName[TSDB_TABLE_NAME_LEN] = {0};
  metaGetTableSzNameByUid(pInfo->readHandle.meta, stbUid, stbName);

  char fullStbName[TSDB_TABLE_FNAME_LEN] = {0};
D
dapan1121 已提交
3434 3435 3436 3437 3438
  if (pSupp->groupByDbName) {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, stbName);
  } else {
    snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s", stbName);
  }
X
Xiaoyu Wang 已提交
3439

S
slzhou 已提交
3440 3441 3442 3443 3444 3445 3446 3447
  uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName));
  pRes->info.id.groupId = groupId;

  SMetaStbStats stats = {0};
  metaGetStbStats(pInfo->readHandle.meta, stbUid, &stats);
  int64_t ctbNum = stats.ctbNum;

  fillTableCountScanDataBlock(pSupp, dbName, stbName, ctbNum, pRes);
S
shenglian zhou 已提交
3448 3449 3450
}

static void destoryTableCountScanOperator(void* param) {
S
slzhou 已提交
3451
  STableCountScanOperatorInfo* pTableCountScanInfo = param;
S
shenglian zhou 已提交
3452 3453
  blockDataDestroy(pTableCountScanInfo->pRes);

S
slzhou 已提交
3454
  taosArrayDestroy(pTableCountScanInfo->stbUidList);
S
shenglian zhou 已提交
3455 3456
  taosMemoryFreeClear(param);
}